КатегорииElasticsearch

Elasticsearch — Урок 5.1 Массовые операции

_bulk API

_bulk API идеально подходит для индексирования или удаления больших наборов данных. Поддерживаются операции создания, индексирования, обновления и удаления. Для каждого запроса должны быть предоставлены метаданные, такие как имя индекса, тип, уникальный идентификатор и маршрутизация. Каждый запрос разделяется новой строкой. Объемный запрос может быть смесью запросов создания, индексации, обновления и удаления. Узел, получающий массовый запрос (также известный как координационный узел), группирует запросы с по осколкам, к котором они принадлежат, и выполняет их параллельно (параллельная обработка зависит от количества cpu). Пулы потоков, которые выполняют одиночные и массовые запросы, независимы.

Предположим, мы хотим добавить товар. Мы можем использовать массовый API для добавления документов в индекс следующим образом:

POST /_bulk
 { "create" : { "_index" : "chapter5", "_type" : "product", "_id" : "1"}}
 { "product_name" : "Long Sleeve Shirt - White", "unit_price" : "30" }
 { "index" : { "_index" : "chapter5", "_type" : "product", "_id" : "2"}}
 { "product_name" : "Button down Shirt", "unit_price" : "40" }
 { "create" : { "_index" : "chapter5", "_type" : "product", "_id" : "3"}}
 { "product_name" : "Non-Iron Solid Shirt", "unit_price" : "40" }

Затем, нам приспичило добавить еще одно поле shipping_price.

Поскольку мы работаем с одним и тем же индексом и типом, мы можем указать их в урле и опустить в метаданных, как показано ниже:

POST chapter5/product/_bulk
 { "update" : { "_id" : "1", "_retry_on_conflict" : 2 }}
 { "doc" : { "shipping_price" : 7.0 } }
 { "update" : { "_id" : "2", "_retry_on_conflict" : 2}}
 { "doc" : { "shipping_price" : 6.5 } }
 { "update" : { "_id" : "3", "_retry_on_conflict" : 2}}
 { "doc" : { "shipping_price" : 5.0 } }

Update операция должна сначала получить исходный документ, а затем применить изменения. Между получением документа и его обновлением,  другой процесс может внести изменения в документ, чтобы этого избежать мы можем использовать _retry_on_conflict параметр, чтобы указать количество попыток повторного выполнения операции. Ответ предыдущего запроса показан ниже:

{
   "took": 10,
   "errors": false,
   "items": [
     {
       "update": {
         "_index": "chapter5",
         "_type": "product",
         "_id": "1",
         "_version": 2,
         "result": "updated",
         "_shards": {
           "total": 2,
           "successful": 1,
           "failed": 0
         },
         "status": 200
       }
     },
     {
       "update": {
         "_index": "chapter5",
         "_type": "product",
         "_id": "2",
         "_version": 2,
         "result": "updated",
         "_shards": {
           "total": 2,
           "successful": 1,
           "failed": 0
         },
         "status": 200
       }
     },
     {
       "update": {
         "_index": "chapter5",
         "_type": "product",
         "_id": "3",
         "_version": 2,
         "result": "updated",
         "_shards": {
           "total": 2,
           "successful": 1,
           "failed": 0
         },
         "status": 200
       }
     }
   ]
 }

Затем, нам захотелось удалить документ и обновить другой:

 POST chapter5/product/_bulk
 { "delete" : { "_id" : "1" } }
 { "update" : { "_id" : "4", "_retry_on_conflict" : 2 }}
 { "doc" : { "unit_price" : 39.99 } }

Каждый запрос выполняется независимо, одна операция не влияет на другую. В предыдущем запросе мы попытались обновить документ (ID: 4), который не существует. Операция удаления выполнена успешно и в ответе отображается ошибка операции обновления. Флаг errors в начале ответа также устанавливается в true, указывая на наличие ошибок при выполнении массового запроса. Ответ предыдущего запроса выглядит следующим образом:

{
   "took": 2,
   "errors": true,
   "items": [
     {
       "delete": {
         "found": true,
         "_index": "chapter5",
         "_type": "product",
         "_id": "1",
         "_version": 4,
         "result": "deleted",
         "_shards": {
           "total": 2,
           "successful": 1,
           "failed": 0
         },
         "status": 200
       }
     },
     {
         "update": {
         "_index": "chapter5",
         "_type": "product",
         "_id": "4",
         "status": 404,
         "error": {
         "type": "document_missing_exception",
           "reason": "[product][4]: document missing",
           "index_uuid": "K8unwaIZQTyGwUb4tFN3sQ",
           "shard": "2",
           "index": "chapter5"
         }
       }
     }
   ]
 }

Не рекомендуется делать слишком большие массовые запросы, потом как перед выполнением он должен полностью загрузиться в оперативную память, лучше поэкспериментировать на реальном железе.

Если вы индексируете данные, которые не требуются для немедленного поиска, подумайте об отключении интервал обновления или увеличьте его. Это уменьшит нагрузку на систему и увеличит пропускную способность индекса. Вы можете временно отключить обновление, как показано ниже:

PUT /chapter5/_settings
 {
   "index": {
     "refresh_interval": "-1"
   }
 }

При массовом запросе данные индексирует сначала на первичных осколках, а затем в репликах. То бишь можно выключить реплики и включить их после завершения индексации. Удалите реплики только в том случае, если вы индексируете данные в первый раз или для неактивного индекса, поскольку удаление реплики может повлиять на пропускную способность запроса.

Multi Get API

Multi Get API используется для извлечения нескольких документов с использованием одного запроса. Здесь показан простой запрос на получение документов на основе уникального идентификатора:

 POST chapter5/product/_mget
 {
    "ids" : ["2", "3"]
 }

Ответ предыдущего запроса выглядит следующим образом:

{
   "docs": [
     {
       "_index": "chapter5",
       "_type": "product",
       "_id": "2",
       "_version": 2,
       "found": true,
       "_source": {
         "product_name": "Button down Shirt",
         "unit_price": "40",
         "shipping_price": 6.5
       }
     },
     {
       "_index": "chapter5",
       "_type": "product",
       "_id": "3",
       "_version": 2,
       "found": true,
       "_source": {
         "product_name": "Non-Iron Solid Shirt",
         "unit_price": "40",
         "shipping_price": 5
       }
     }
   ]
 }

Для получения документов из разных индексов / типов используются  метаданные:

POST _mget
 {
   "docs": [
     {
       "_index": "chapter5",
       "_type": "product",
       "_id": "2"
     },
     {
       "_index": "chapter5",
       "_type": "product",
       "_id": "3"
     }
   ]
 }

По умолчанию возвращается весь документ. Вы можете использовать _source для включения / исключения полей документа в ответе, как показано ниже:

POST _mget
 {
   "docs": [
     {
       "_index": "chapter5",
       "_type": "product",
       "_id": "2",
       "_source": [
          "product_name"
       ]
     },
     {
       "_index": "chapter5",
       "_type": "product",
       "_id": "3",
       "_source": {
          "exclude": "unit_price"
       }
     }
   ]
 }

Ответ на предыдущий запрос выглядит следующим образом:

{
   "docs": [
     {
       "_index": "chapter5",
       "_type": "product",
       "_id": "2",
       "_version": 2,
       "found": true,
       "_source": {
         "product_name": "Button down Shirt"
       }
     },
     {
       "_index": "chapter5",
       "_type": "product",
       "_id": "3",
       "_version": 2,
       "found": true,
       "_source": {
         "shipping_price": 5,
         "product_name": "Non-Iron Solid Shirt"
       }
     }
   ]
 }

_update_by_query API

Используется для обновления всех документов, соответствующих конкретному условию. В зависимости от количества документов в индексе это может быть очень ресурсоемкой операцией и может замедлить существующие операции поиска или индекса.

Например, для продуктов, которые мы индексировали, мы хотим добавить новое поле price_with_tax, которое содержит цену плюс налог. Мы будем использовать обновление по запросу и сценарий для расчета. Предполагая, 10%  налог, мы будем использовать цену за единицу, чтобы определить цену с налогом. Следующий запрос используется match_all для соответствия всем документам, но вы можете указать запрос, если хотите ограничить документы, которые хотите обновить.

_update_by_query API показано здесь:

 POST chapter5/_update_by_query
 {
   "script": {
     "inline": "ctx._source.price_with_tax = ctx._source.unit_price * 1.1",
     "lang": "painless"
   },
   "query": {
     "match_all": {}
   }
 }

Ответ на предыдущий запрос выглядит следующим образом:

{
   "took": 9,
   "timed_out": false,
   "total": 2,
   "updated": 2,
   "deleted": 0,
   "batches": 1,
   "version_conflicts": 0,
   "noops": 0,
   "retries": {
     "bulk": 0,
     "search": 0
   },
   "throttled_millis": 0,
   "requests_per_second": -1,
   "throttled_until_millis": 0,
   "failures": []
 }

Обновление выполняется путем создания моментального снимка существующего индекса и выполнения обновления для каждого документа. В предыдущем ответе вы можете видеть это, version_conflicts:0 у нас нет конфликтов. Конфликт версий происходит, если документ обновляется между моментальным снимком и его обновлением. По умолчанию, если возникает конфликт, операция завершается с ошибкой. Если вы хотите, чтобы операция продолжалась, несмотря на конфликты версий, вы можете указать так:

 POST chapter5/_update_by_query?conflicts=proceed
 {
   "script": {
     "inline": "ctx._source.price_with_tax = ctx._source.unit_price * 1.1",
     "lang": "painless"
   },
   "query": {
     "match_all": {}
   }
 }

Запрос не прервется , если есть конфликт, и любые конфликты , которые происходят во время обновления указаны в ответ. В предыдущем ответе вы можете видеть, что количество партий ( batches:1) есть 1. Если индекс, который вы пытаетесь обновить, достаточно велик, вместо того, чтобы ждать ответа, вы можете попросить Elasticsearch ответить идентификатором задачи. Затем вы можете использовать API-интерфейс задачи для отслеживания прогресса или отмены задачи. Для того, чтобы Elasticsearch реагировать с идентификатором задачи, необходимо установить wait_for_completion в false качестве параметра запроса , как показано ниже:

 POST chapter5/_update_by_query?wait_for_completion=false
 {
   "script": {
     "inline": "ctx._source.price_with_tax = ctx._source.unit_price * 1.1",
     "lang": "painless"
   },
   "query": {
     "match_all": {}
   }
 }

Ответ на предыдущий запрос показан здесь. Он содержит идентификатор задачи:

 {
   "task": "xZjyFE19Q0yGxehcys6ydg:307842"
 }

Вы можете отслеживать состояние задачи, используя идентификатор задачи, следующим образом:

GET /_tasks/xZjyFE19Q0yGxehcys6ydg:307842

Ответ на состояние показан здесь:

{
"completed": true,
   "task": {
     "node": "xZjyFE19Q0yGxehcys6ydg",
     "id": 307842,
     "type": "transport",
     "action": "indices:data/write/update/byquery",
     "status": {
       "total": 2,
       "updated": 2,
       "created": 0,
       "deleted": 0,
       "batches": 1,
       "version_conflicts": 0,
       "noops": 0,
       "retries": {
         "bulk": 0,
         "search": 0
       },
       "throttled_millis": 0,
       "requests_per_second": -1,
       "throttled_until_millis": 0
     },
     "description": "",
     "start_time_in_millis": 1491805218867,
     "running_time_in_nanos": 5283799,
     "cancellable": true
   },
   "response": {
     "took": 5,
     "timed_out": false,
     "total": 2,
     "updated": 2,
     "created": 0,
     "deleted": 0,
     "batches": 1,
     "version_conflicts": 0,
     "noops": 0,
     "retries": {
       "bulk": 0,
       "search": 0
     },
     "throttled_millis": 0,
     "requests_per_second": -1,
     "throttled_until_millis": 0,
     "failures": []
   }
 }

В предыдущем ответе вы можете увидеть, что задача завершена, но вы также можете отменить задачу с помощью API отмены, как показано здесь:

POST /_tasks/xZjyFE19Q0yGxehcys6ydg:307842/_cancel

_delete_by_query

Очень похоже на обновление по условию, но может быть использовано для удаления документов, соответствующих запросу. Он работает, делая снимок индекса и удаляя документы. Между выполнением моментального снимка и выполнением запроса удаления, если документы изменены, операция завершилась неудачно из-за конфликта версий. Чтобы продолжить работу с ошибками, можно указать параметр conflicts=proceed URL-адреса. Например, чтобы удалить все документы, содержащие слово shirt, запрос выглядит следующим образом:

POST chapter5/_delete_by_query?conflicts=proceed
 {
   "query": {
     "match": {
       "product_name": "shirt"
     }
   }
 }

Ответ на предыдущий запрос:

{
   "took": 5,
   "timed_out": false,
   "total": 2,
   "deleted": 2,
   "batches": 1,
   "version_conflicts": 0,
   "noops": 0,
   "retries": {
     "bulk": 0,
     "search": 0
   },
   "throttled_millis": 0,
   "requests_per_second": -1,
   "throttled_until_millis": 0,
   "failures": []
 }

Вы можете установить wait_for_completion=false в качестве параметра URL, чтобы получить идентификатор задачи в качестве ответа на запрос и отслеживать ход запроса с помощью API задач.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *