Elasticsearch — Урок 5.1 Массовые операции

_bulk API

_bulk API идеально подходит для индексирования или удаления больших наборов данных. Поддерживаются операции создания, индексирования, обновления и удаления. Для каждого запроса должны быть предоставлены метаданные, такие как имя индекса, тип, уникальный идентификатор и маршрутизация. Каждый запрос разделяется новой строкой. Объемный запрос может быть смесью запросов создания, индексации, обновления и удаления. Узел, получающий массовый запрос (также известный как координационный узел), группирует запросы с по осколкам, к котором они принадлежат, и выполняет их параллельно (параллельная обработка зависит от количества cpu). Пулы потоков, которые выполняют одиночные и массовые запросы, независимы.

Предположим, мы хотим добавить товар. Мы можем использовать массовый API для добавления документов в индекс следующим образом:

Затем, нам приспичило добавить еще одно поле shipping_price.

Поскольку мы работаем с одним и тем же индексом и типом, мы можем указать их в урле и опустить в метаданных, как показано ниже:

Update операция должна сначала получить исходный документ, а затем применить изменения. Между получением документа и его обновлением,  другой процесс может внести изменения в документ, чтобы этого избежать мы можем использовать _retry_on_conflict параметр, чтобы указать количество попыток повторного выполнения операции. Ответ предыдущего запроса показан ниже:

Затем, нам захотелось удалить документ и обновить другой:

Каждый запрос выполняется независимо, одна операция не влияет на другую. В предыдущем запросе мы попытались обновить документ (ID: 4), который не существует. Операция удаления выполнена успешно и в ответе отображается ошибка операции обновления. Флаг errors в начале ответа также устанавливается в true, указывая на наличие ошибок при выполнении массового запроса. Ответ предыдущего запроса выглядит следующим образом:

Не рекомендуется делать слишком большие массовые запросы, потом как перед выполнением он должен полностью загрузиться в оперативную память, лучше поэкспериментировать на реальном железе.

Если вы индексируете данные, которые не требуются для немедленного поиска, подумайте об отключении интервал обновления или увеличьте его. Это уменьшит нагрузку на систему и увеличит пропускную способность индекса. Вы можете временно отключить обновление, как показано ниже:

При массовом запросе данные индексирует сначала на первичных осколках, а затем в репликах. То бишь можно выключить реплики и включить их после завершения индексации. Удалите реплики только в том случае, если вы индексируете данные в первый раз или для неактивного индекса, поскольку удаление реплики может повлиять на пропускную способность запроса.

Multi Get API

Multi Get API используется для извлечения нескольких документов с использованием одного запроса. Здесь показан простой запрос на получение документов на основе уникального идентификатора:

Ответ предыдущего запроса выглядит следующим образом:

Для получения документов из разных индексов / типов используются  метаданные:

По умолчанию возвращается весь документ. Вы можете использовать _source для включения / исключения полей документа в ответе, как показано ниже:

Ответ на предыдущий запрос выглядит следующим образом:

_update_by_query API

Используется для обновления всех документов, соответствующих конкретному условию. В зависимости от количества документов в индексе это может быть очень ресурсоемкой операцией и может замедлить существующие операции поиска или индекса.

Например, для продуктов, которые мы индексировали, мы хотим добавить новое поле price_with_tax, которое содержит цену плюс налог. Мы будем использовать обновление по запросу и сценарий для расчета. Предполагая, 10%  налог, мы будем использовать цену за единицу, чтобы определить цену с налогом. Следующий запрос используется match_all для соответствия всем документам, но вы можете указать запрос, если хотите ограничить документы, которые хотите обновить.

_update_by_query API показано здесь:

Ответ на предыдущий запрос выглядит следующим образом:

Обновление выполняется путем создания моментального снимка существующего индекса и выполнения обновления для каждого документа. В предыдущем ответе вы можете видеть это, version_conflicts:0 у нас нет конфликтов. Конфликт версий происходит, если документ обновляется между моментальным снимком и его обновлением. По умолчанию, если возникает конфликт, операция завершается с ошибкой. Если вы хотите, чтобы операция продолжалась, несмотря на конфликты версий, вы можете указать так:

Запрос не прервется , если есть конфликт, и любые конфликты , которые происходят во время обновления указаны в ответ. В предыдущем ответе вы можете видеть, что количество партий ( batches:1) есть 1. Если индекс, который вы пытаетесь обновить, достаточно велик, вместо того, чтобы ждать ответа, вы можете попросить Elasticsearch ответить идентификатором задачи. Затем вы можете использовать API-интерфейс задачи для отслеживания прогресса или отмены задачи. Для того, чтобы Elasticsearch реагировать с идентификатором задачи, необходимо установить wait_for_completion в false качестве параметра запроса , как показано ниже:

Ответ на предыдущий запрос показан здесь. Он содержит идентификатор задачи:

Вы можете отслеживать состояние задачи, используя идентификатор задачи, следующим образом:

Ответ на состояние показан здесь:

В предыдущем ответе вы можете увидеть, что задача завершена, но вы также можете отменить задачу с помощью API отмены, как показано здесь:

_delete_by_query

Очень похоже на обновление по условию, но может быть использовано для удаления документов, соответствующих запросу. Он работает, делая снимок индекса и удаляя документы. Между выполнением моментального снимка и выполнением запроса удаления, если документы изменены, операция завершилась неудачно из-за конфликта версий. Чтобы продолжить работу с ошибками, можно указать параметр conflicts=proceed URL-адреса. Например, чтобы удалить все документы, содержащие слово shirt, запрос выглядит следующим образом:

Ответ на предыдущий запрос:

Вы можете установить wait_for_completion=false в качестве параметра URL, чтобы получить идентификатор задачи в качестве ответа на запрос и отслеживать ход запроса с помощью API задач.