_bulk API
_bulk API идеально подходит для индексирования или удаления больших наборов данных. Поддерживаются операции создания, индексирования, обновления и удаления. Для каждого запроса должны быть предоставлены метаданные, такие как имя индекса, тип, уникальный идентификатор и маршрутизация. Каждый запрос разделяется новой строкой. Объемный запрос может быть смесью запросов создания, индексации, обновления и удаления. Узел, получающий массовый запрос (также известный как координационный узел), группирует запросы с по осколкам, к котором они принадлежат, и выполняет их параллельно (параллельная обработка зависит от количества cpu). Пулы потоков, которые выполняют одиночные и массовые запросы, независимы.
Предположим, мы хотим добавить товар. Мы можем использовать массовый API для добавления документов в индекс следующим образом:
POST /_bulk { "create" : { "_index" : "chapter5", "_type" : "product", "_id" : "1"}} { "product_name" : "Long Sleeve Shirt - White", "unit_price" : "30" } { "index" : { "_index" : "chapter5", "_type" : "product", "_id" : "2"}} { "product_name" : "Button down Shirt", "unit_price" : "40" } { "create" : { "_index" : "chapter5", "_type" : "product", "_id" : "3"}} { "product_name" : "Non-Iron Solid Shirt", "unit_price" : "40" }
Затем, нам приспичило добавить еще одно поле shipping_price.
Поскольку мы работаем с одним и тем же индексом и типом, мы можем указать их в урле и опустить в метаданных, как показано ниже:
POST chapter5/product/_bulk { "update" : { "_id" : "1", "_retry_on_conflict" : 2 }} { "doc" : { "shipping_price" : 7.0 } } { "update" : { "_id" : "2", "_retry_on_conflict" : 2}} { "doc" : { "shipping_price" : 6.5 } } { "update" : { "_id" : "3", "_retry_on_conflict" : 2}} { "doc" : { "shipping_price" : 5.0 } }
Update операция должна сначала получить исходный документ, а затем применить изменения. Между получением документа и его обновлением, другой процесс может внести изменения в документ, чтобы этого избежать мы можем использовать _retry_on_conflict параметр, чтобы указать количество попыток повторного выполнения операции. Ответ предыдущего запроса показан ниже:
{ "took": 10, "errors": false, "items": [ { "update": { "_index": "chapter5", "_type": "product", "_id": "1", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200 } }, { "update": { "_index": "chapter5", "_type": "product", "_id": "2", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200 } }, { "update": { "_index": "chapter5", "_type": "product", "_id": "3", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200 } } ] }
Затем, нам захотелось удалить документ и обновить другой:
POST chapter5/product/_bulk { "delete" : { "_id" : "1" } } { "update" : { "_id" : "4", "_retry_on_conflict" : 2 }} { "doc" : { "unit_price" : 39.99 } }
Каждый запрос выполняется независимо, одна операция не влияет на другую. В предыдущем запросе мы попытались обновить документ (ID: 4), который не существует. Операция удаления выполнена успешно и в ответе отображается ошибка операции обновления. Флаг errors в начале ответа также устанавливается в true, указывая на наличие ошибок при выполнении массового запроса. Ответ предыдущего запроса выглядит следующим образом:
{ "took": 2, "errors": true, "items": [ { "delete": { "found": true, "_index": "chapter5", "_type": "product", "_id": "1", "_version": 4, "result": "deleted", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200 } }, { "update": { "_index": "chapter5", "_type": "product", "_id": "4", "status": 404, "error": { "type": "document_missing_exception", "reason": "[product][4]: document missing", "index_uuid": "K8unwaIZQTyGwUb4tFN3sQ", "shard": "2", "index": "chapter5" } } } ] }
Не рекомендуется делать слишком большие массовые запросы, потом как перед выполнением он должен полностью загрузиться в оперативную память, лучше поэкспериментировать на реальном железе.
Если вы индексируете данные, которые не требуются для немедленного поиска, подумайте об отключении интервал обновления или увеличьте его. Это уменьшит нагрузку на систему и увеличит пропускную способность индекса. Вы можете временно отключить обновление, как показано ниже:
PUT /chapter5/_settings { "index": { "refresh_interval": "-1" } }
При массовом запросе данные индексирует сначала на первичных осколках, а затем в репликах. То бишь можно выключить реплики и включить их после завершения индексации. Удалите реплики только в том случае, если вы индексируете данные в первый раз или для неактивного индекса, поскольку удаление реплики может повлиять на пропускную способность запроса.
Multi Get API
Multi Get API используется для извлечения нескольких документов с использованием одного запроса. Здесь показан простой запрос на получение документов на основе уникального идентификатора:
POST chapter5/product/_mget { "ids" : ["2", "3"] }
Ответ предыдущего запроса выглядит следующим образом:
{
"docs": [
{
"_index": "chapter5",
"_type": "product",
"_id": "2",
"_version": 2,
"found": true,
"_source": {
"product_name": "Button down Shirt",
"unit_price": "40",
"shipping_price": 6.5
}
},
{
"_index": "chapter5",
"_type": "product",
"_id": "3",
"_version": 2,
"found": true,
"_source": {
"product_name": "Non-Iron Solid Shirt",
"unit_price": "40",
"shipping_price": 5
}
}
]
}
Для получения документов из разных индексов / типов используются метаданные:
POST _mget
{
"docs": [
{
"_index": "chapter5",
"_type": "product",
"_id": "2"
},
{
"_index": "chapter5",
"_type": "product",
"_id": "3"
}
]
}
По умолчанию возвращается весь документ. Вы можете использовать _source для включения / исключения полей документа в ответе, как показано ниже:
POST _mget { "docs": [ { "_index": "chapter5", "_type": "product", "_id": "2", "_source": [ "product_name" ] }, { "_index": "chapter5", "_type": "product", "_id": "3", "_source": { "exclude": "unit_price" } } ] }
Ответ на предыдущий запрос выглядит следующим образом:
{ "docs": [ { "_index": "chapter5", "_type": "product", "_id": "2", "_version": 2, "found": true, "_source": { "product_name": "Button down Shirt" } }, { "_index": "chapter5", "_type": "product", "_id": "3", "_version": 2, "found": true, "_source": { "shipping_price": 5, "product_name": "Non-Iron Solid Shirt" } } ] }
_update_by_query API
Используется для обновления всех документов, соответствующих конкретному условию. В зависимости от количества документов в индексе это может быть очень ресурсоемкой операцией и может замедлить существующие операции поиска или индекса.
Например, для продуктов, которые мы индексировали, мы хотим добавить новое поле price_with_tax, которое содержит цену плюс налог. Мы будем использовать обновление по запросу и сценарий для расчета. Предполагая, 10% налог, мы будем использовать цену за единицу, чтобы определить цену с налогом. Следующий запрос используется match_all для соответствия всем документам, но вы можете указать запрос, если хотите ограничить документы, которые хотите обновить.
_update_by_query API показано здесь:
POST chapter5/_update_by_query { "script": { "inline": "ctx._source.price_with_tax = ctx._source.unit_price * 1.1", "lang": "painless" }, "query": { "match_all": {} } }
Ответ на предыдущий запрос выглядит следующим образом:
{ "took": 9, "timed_out": false, "total": 2, "updated": 2, "deleted": 0, "batches": 1, "version_conflicts": 0, "noops": 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1, "throttled_until_millis": 0, "failures": [] }
Обновление выполняется путем создания моментального снимка существующего индекса и выполнения обновления для каждого документа. В предыдущем ответе вы можете видеть это, version_conflicts:0 у нас нет конфликтов. Конфликт версий происходит, если документ обновляется между моментальным снимком и его обновлением. По умолчанию, если возникает конфликт, операция завершается с ошибкой. Если вы хотите, чтобы операция продолжалась, несмотря на конфликты версий, вы можете указать так:
POST chapter5/_update_by_query?conflicts=proceed { "script": { "inline": "ctx._source.price_with_tax = ctx._source.unit_price * 1.1", "lang": "painless" }, "query": { "match_all": {} } }
Запрос не прервется , если есть конфликт, и любые конфликты , которые происходят во время обновления указаны в ответ. В предыдущем ответе вы можете видеть, что количество партий ( batches:1) есть 1. Если индекс, который вы пытаетесь обновить, достаточно велик, вместо того, чтобы ждать ответа, вы можете попросить Elasticsearch ответить идентификатором задачи. Затем вы можете использовать API-интерфейс задачи для отслеживания прогресса или отмены задачи. Для того, чтобы Elasticsearch реагировать с идентификатором задачи, необходимо установить wait_for_completion в false качестве параметра запроса , как показано ниже:
POST chapter5/_update_by_query?wait_for_completion=false { "script": { "inline": "ctx._source.price_with_tax = ctx._source.unit_price * 1.1", "lang": "painless" }, "query": { "match_all": {} } }
Ответ на предыдущий запрос показан здесь. Он содержит идентификатор задачи:
{ "task": "xZjyFE19Q0yGxehcys6ydg:307842" }
Вы можете отслеживать состояние задачи, используя идентификатор задачи, следующим образом:
GET /_tasks/xZjyFE19Q0yGxehcys6ydg:307842
Ответ на состояние показан здесь:
{ "completed": true, "task": { "node": "xZjyFE19Q0yGxehcys6ydg", "id": 307842, "type": "transport", "action": "indices:data/write/update/byquery", "status": { "total": 2, "updated": 2, "created": 0, "deleted": 0, "batches": 1, "version_conflicts": 0, "noops": 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1, "throttled_until_millis": 0 }, "description": "", "start_time_in_millis": 1491805218867, "running_time_in_nanos": 5283799, "cancellable": true }, "response": { "took": 5, "timed_out": false, "total": 2, "updated": 2, "created": 0, "deleted": 0, "batches": 1, "version_conflicts": 0, "noops": 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1, "throttled_until_millis": 0, "failures": [] } }
В предыдущем ответе вы можете увидеть, что задача завершена, но вы также можете отменить задачу с помощью API отмены, как показано здесь:
POST /_tasks/xZjyFE19Q0yGxehcys6ydg:307842/_cancel
_delete_by_query
Очень похоже на обновление по условию, но может быть использовано для удаления документов, соответствующих запросу. Он работает, делая снимок индекса и удаляя документы. Между выполнением моментального снимка и выполнением запроса удаления, если документы изменены, операция завершилась неудачно из-за конфликта версий. Чтобы продолжить работу с ошибками, можно указать параметр conflicts=proceed URL-адреса. Например, чтобы удалить все документы, содержащие слово shirt, запрос выглядит следующим образом:
POST chapter5/_delete_by_query?conflicts=proceed { "query": { "match": { "product_name": "shirt" } } }
Ответ на предыдущий запрос:
{
"took": 5,
"timed_out": false,
"total": 2,
"deleted": 2,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": -1,
"throttled_until_millis": 0,
"failures": []
}
Вы можете установить wait_for_completion=false в качестве параметра URL, чтобы получить идентификатор задачи в качестве ответа на запрос и отслеживать ход запроса с помощью API задач.