В Elasticsearch 5.0 был введен узел ingest. Используя узел ingest, можно определить конвейеры для изменения документов до индексирования. Конвейер — это серия процессоров, каждый из которых работает с одним или несколькими полями в документе. Наиболее часто используемые фильтры Logstash доступны как обработчики. Например, используя grok фильтр для извлечения данных из файла журнала Apache в документ, извлечение полей из JSON, изменение формата даты, вычисление геоданных из местоположения и т. д. Возможности безграничны. Elasticsearch поддерживает множество обработчиков из коробки. Вы также можете разрабатывать свои собственные процессоры с использованием любых JVM-языков.
По умолчанию все узлы в кластере могут выступать в качестве узлов ingest. Для добавления или удаления конвейера _ingest предоставляется API. API Ingest также поддерживает моделирование конвейера по данным документа для отладки и тестирования конвейера. Ниже показана базовая структура:
PUT _ingest/pipeline/<pipeline-name> { "pipeline": { "description": "pipeline-description", "processors": [] } }
Как показано выше, принимают список processors, которые выполняются последовательно. Чтобы лучше объяснить API Ingest, давайте рассмотрим пример. Когда пользователь просматривает товар/элемент, сообщение регистрируется в следующем формате. В конце этого раздела мы определим конвейер для преобразования сообщения журнала в документ JSON и индексации документа в Elasticsearch. Мы также будем использовать скопления для поиска наиболее просматриваемых предметов:
{ "message": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web" }
Давайте определим конвейер для синтаксического анализа сообщения журнала с использованием grok процессора.
Grok — это не что иное, как набор регулярных выражений для анализа неструктурированных данных, таких как журналы, в структурированные данные.
Синтаксис шаблона grok %{SYNTAX:SEMANTIC}, синтаксис — это имя шаблона, а семантика — это имя поля.
Например, в %{IP:client} IP это шаблон, а client — это имя поля в документе JSON. Список предварительно определенных шаблонов можно найти тут https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
Мы можем использовать следующий grok шаблон для преобразования приведенного выше сообщения журнала в документ JSON:
"%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} %{WORD:platform}"
Перед созданием конвейера мы можем протестировать конвейер с использованием _simulate, как показано ниже:
POST _ingest/pipeline/_simulate { "pipeline": { "description": "Item View Pipeline", "processors": [ { "grok": { "field": "log", "patterns": [ "%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} %{WORD:platform}" ] } } ] }, "docs": [ { "_source": { "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web" } } ] }
Ответ:
{ "docs": [ { "doc": { "_id": "_id", "_index": "_index", "_type": "_type", "_source": { "country": "USA", "itemId": "1", "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web", "client": "127.0.0.1", "platform": "Web", "timestamp": "2017-04-11T09:02:34.234+07:00" }, "_ingest": { "timestamp": "2017-04-12T06:51:37.005+0000" } } } ] }
Вы можете видеть из предыдущего ответа, что сообщение журнала преобразуется в документ JSON с использованием процессора grok. Наряду с полями исходное сообщение журнала также сохраняется в поле log. Затем мы можем добавить процессор для удаления поля журнала из документа, как показано ниже:
POST _ingest/pipeline/_simulate { "pipeline": { "description": "Item View Pipeline", "processors": [ { "grok": { "field": "log", "patterns": [ "%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} %{WORD:platform}" ] } }, { "remove": { "field": "log" } } ] }, "docs": [ { "_source": { "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web" } } ] }
Ответ:
{
"docs": [
{
"doc": {
"_id": "_id",
"_index": "_index",
"_type": "_type",
"_source": {
"country": "USA",
"itemId": "1",
"client": "127.0.0.1",
"platform": "Web",
"timestamp": "2017-04-11T09:02:34.234+07:00"
},
"_ingest": {
"timestamp": "2017-04-12T06:54:09.406+0000"
}
}
}
]
}
Полученный документ выглядит намного лучше, чем сообщение необработанного журнала. Затем мы хотим сохранить документ в месячном индексе. Все сообщения, зарегистрированные в одном месяце, относятся к одному и тому же индексу. Группировка журналов на основе временного интервала упрощает очистку или удаление старых журналов. Мы будем использовать date_index_name процессор, который вводит поле даты в качестве входных данных и выводит имя индекса, которому принадлежит документ:
POST _ingest/pipeline/_simulate { "pipeline": { "description": "Item View Pipeline", "processors": [ { "grok": { "field": "log", "patterns": [ "%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} %{WORD:platform}" ] } }, { "remove": { "field": "log" } }, { "date_index_name": { "field": "timestamp", "index_name_prefix": "viewitem-", "date_rounding": "M", "index_name_format": "yyyy-MM" } } ] }, "docs": [ { "_source": { "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web" } } ] }
В предыдущем запросе мы использовали date_index_name процессор для определения индекса, к которому принадлежит документ. Значение index_name_prefix префикса добавлено в index_name_format. В предыдущем запросе имя индекса будет viewitem_2017_04. Поддерживаются следующие date_rounding параметры:
Единица времени | Описание |
y | год |
M | месяц |
w | неделя |
d | день |
h | час |
m | минута |
s | секунда |
Ответ:
{ "docs": [ { "doc": { "_id": "_id", "_index": "<viewitem_{2017_04||/M{yyyy_MM|UTC}}>", "_type": "_type", "_source": { "country": "USA", "itemId": "1", "client": "127.0.0.1", "platform": "Web", "timestamp": "2017-04-11T09:02:34.234+07:00" }, "_ingest": { "timestamp": "2017-04-12T07:27:42.917+0000" } } } ] }
До сих пор мы использовали функцию имитации для проверки конвейера, теперь мы можем создать конвейер с помощью _ingest API PUT, как показано здесь:
PUT _ingest/pipeline/view_item_pipeline { "description": "Item View Pipeline", "processors": [ { "grok": { "field": "log", "patterns": [ "%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} %{WORD:platform}" ] } }, { "remove": { "field": "log" } }, { "date_index_name": { "field": "timestamp", "index_name_prefix": "viewitem_", "date_rounding": "M", "index_name_format": "yyyy_MM" } } ] }
Полный список поддерживаемых процессоров можно найти в официальной документации:
https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html
Давайте проиндексируем пару документов, используя только что определенный конвейер:
POST chapter5/log/?pipeline=view_item_pipeline { "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web " } POST chapter5/log/?pipeline=view_item_pipeline { "log": "127.0.0.1 2017-04-12T09:02:34.234+07:00 USA 2 Web " } POST chapter5/log/?pipeline=view_item_pipeline { "log": "127.0.0.1 2017-04-13T09:02:34.234+07:00 USA 3 Web " } POST chapter5/log/?pipeline=view_item_pipeline { "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web " }
Давайте посмотрим на ответ одного из предыдущих запросов:
{ "_index": "viewitem_2017_04", "_type": "log", "_id": "AVyeUsnSiOAxqKpNGiK7", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true }
Из предыдущего ответа вы можете видеть, что документ попал в viewitem_2017_04 индекс, хотя исходный запрос обращался в chapter5 индекс. Мы можем получить документ с помощью GET API, как показано ниже:
GET viewitem_2017_04/log/AVyeUsnSiOAxqKpNGiK7
Сообщение журнала «127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web» преобразуется в документ, показанный ниже:
{ "_index": "viewitem_2017_04", "_type": "log", "_id": "AVyeUsnSiOAxqKpNGiK7", "_version": 1, "found": true, "_source": { "country": "USA", "itemId": "1", "client": "127.0.0.1", "platform": "Web", "timestamp": "2017-04-11T09:02:34.234+07:00" } }
Затем мы можем использовать viewitem индекс для агрегирования, чтобы получить пятерку наиболее просматриваемых элементов.
Процессор Grok по умолчанию понимает все поля как текст. Вы можете определить шаблон индекса с сопоставлениями, если вы хотите изменить поведение по умолчанию