КатегорииElasticsearch

Elasticsearch — Урок 5.3 Ingest Node

Традиционно Logstash используется для предварительной обработки ваших данных перед индексированием в Elasticsearch. Используя Logstash, вы можете определить конвейеры для извлечения, преобразования и индексации ваших данных в Elasticsearch.

В Elasticsearch 5.0 был введен узел ingest. Используя узел ingest, можно определить конвейеры для изменения документов до индексирования. Конвейер — это серия процессоров, каждый из которых работает с одним или несколькими полями в документе. Наиболее часто используемые фильтры Logstash доступны как обработчики. Например, используя grok фильтр для извлечения данных из файла журнала Apache в документ, извлечение полей из JSON, изменение формата даты, вычисление геоданных из местоположения и т. д. Возможности безграничны. Elasticsearch поддерживает множество обработчиков из коробки. Вы также можете разрабатывать свои собственные процессоры с использованием любых JVM-языков.

По умолчанию все узлы в кластере могут выступать в качестве узлов ingest. Для добавления или удаления конвейера _ingest предоставляется API. API Ingest также поддерживает моделирование конвейера по данным документа для отладки и тестирования конвейера. Ниже показана базовая структура:

PUT _ingest/pipeline/<pipeline-name>
 {
   "pipeline": {
     "description": "pipeline-description",
     "processors": []
   }
 }

Как показано выше, принимают список processors, которые выполняются последовательно. Чтобы лучше объяснить API Ingest, давайте рассмотрим пример. Когда пользователь просматривает товар/элемент, сообщение регистрируется в следующем формате. В конце этого раздела мы определим конвейер для преобразования сообщения журнала в документ JSON и индексации документа в Elasticsearch. Мы также будем использовать скопления для поиска наиболее просматриваемых предметов:

{
   "message": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web"
}

Давайте определим конвейер для синтаксического анализа сообщения журнала с использованием grok процессора.

Grok — это не что иное, как набор регулярных выражений для анализа неструктурированных данных, таких как журналы, в структурированные данные.

Синтаксис шаблона grok %{SYNTAX:SEMANTIC}, синтаксис — это имя шаблона, а семантика — это имя поля.

Например, в %{IP:client} IP это шаблон, а client — это имя поля в документе JSON. Список предварительно определенных шаблонов можно найти тут https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

Мы можем использовать следующий grok шаблон для преобразования приведенного выше сообщения журнала в документ JSON:

"%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} 
%{WORD:platform}"

Перед созданием конвейера мы можем протестировать конвейер с использованием _simulate, как показано ниже:

POST _ingest/pipeline/_simulate
 {
   "pipeline": {
     "description": "Item View Pipeline",
     "processors": [
       {
         "grok": {
           "field": "log",
           "patterns": [
"%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} %{WORD:platform}"
           ]
         }
       }
     ]
   },
   "docs": [
     {
       "_source": {
         "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web"
       }
     }
   ]
 }

Ответ:

 {
   "docs": [
     {
       "doc": {
         "_id": "_id",
         "_index": "_index",
         "_type": "_type",
         "_source": {
           "country": "USA",
           "itemId": "1",
           "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web",
           "client": "127.0.0.1",
           "platform": "Web",
           "timestamp": "2017-04-11T09:02:34.234+07:00"
         },
         "_ingest": {
           "timestamp": "2017-04-12T06:51:37.005+0000"
         }
       }
     }
   ]
 }

Вы можете видеть из предыдущего ответа, что сообщение журнала преобразуется в документ JSON с использованием процессора grok. Наряду с полями исходное сообщение журнала также сохраняется в поле log. Затем мы можем добавить процессор для удаления поля журнала из документа, как показано ниже:

POST _ingest/pipeline/_simulate
 {
   "pipeline": {
     "description": "Item View Pipeline",
     "processors": [
       {
         "grok": {
           "field": "log",
           "patterns": [
             "%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} %{WORD:platform}"
           ]
         }
       },
{
         "remove": {
           "field": "log"
         }
       }
     ]
   },
   "docs": [
     {
       "_source": {
         "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web"
       }
     }
   ]
 }

Ответ:

{
   "docs": [
     {
       "doc": {
         "_id": "_id",
         "_index": "_index",
         "_type": "_type",
         "_source": {
           "country": "USA",
           "itemId": "1",
           "client": "127.0.0.1",
           "platform": "Web",
           "timestamp": "2017-04-11T09:02:34.234+07:00"
         },
         "_ingest": {
           "timestamp": "2017-04-12T06:54:09.406+0000"
         }
       }
     }
   ]
 }

Полученный документ выглядит намного лучше, чем сообщение необработанного журнала. Затем мы хотим сохранить документ в месячном индексе. Все сообщения, зарегистрированные в одном месяце, относятся к одному и тому же индексу. Группировка журналов на основе временного интервала упрощает очистку или удаление старых журналов. Мы будем использовать date_index_name процессор, который вводит поле даты в качестве входных данных и выводит имя индекса, которому принадлежит документ:

POST _ingest/pipeline/_simulate
 {
   "pipeline": {
     "description": "Item View Pipeline",
     "processors": [
       {
         "grok": {
           "field": "log",
           "patterns": [
             "%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} %{WORD:platform}"
           ]
         }
       },
       {
         "remove": {
           "field": "log"
         }
       },
{
         "date_index_name": {
           "field": "timestamp",
           "index_name_prefix": "viewitem-",
           "date_rounding": "M",
           "index_name_format": "yyyy-MM"
         }
       }
     ]
   },
   "docs": [
     {
       "_source": {
         "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web"
       }
     }
   ]
 }

В предыдущем запросе мы использовали date_index_name процессор для определения индекса, к которому принадлежит документ. Значение index_name_prefix префикса добавлено в index_name_format. В предыдущем запросе имя индекса будет viewitem_2017_04. Поддерживаются следующие date_rounding параметры:

Единица времени Описание
y год
M месяц
w неделя
d день
h час
m минута
s секунда

Ответ:

{
   "docs": [
     {
       "doc": {
         "_id": "_id",
         "_index": "<viewitem_{2017_04||/M{yyyy_MM|UTC}}>",
         "_type": "_type",
         "_source": {
           "country": "USA",
           "itemId": "1",
           "client": "127.0.0.1",
           "platform": "Web",
           "timestamp": "2017-04-11T09:02:34.234+07:00"
         },
         "_ingest": {
           "timestamp": "2017-04-12T07:27:42.917+0000"
         }
       }
     }
   ]
 }

До сих пор мы использовали функцию имитации для проверки конвейера, теперь мы можем создать конвейер с помощью _ingest API PUT, как показано здесь:

PUT _ingest/pipeline/view_item_pipeline
 {
   "description": "Item View Pipeline",
   "processors": [
     {
       "grok": {
         "field": "log",
         "patterns": [
           "%{IP:client} %{TIMESTAMP_ISO8601:timestamp} %{WORD:country} %{NUMBER:itemId} %{WORD:platform}"
         ]
       }
     },
     {
       "remove": {
         "field": "log"
       }
     },
     {
       "date_index_name": {
         "field": "timestamp",
         "index_name_prefix": "viewitem_",
         "date_rounding": "M",
         "index_name_format": "yyyy_MM"
       }
     }
   ]
 }

Полный список поддерживаемых процессоров можно найти в официальной документации:

https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html

Давайте проиндексируем пару документов, используя только что определенный конвейер:

 POST chapter5/log/?pipeline=view_item_pipeline
 {
   "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web "
 }

 POST chapter5/log/?pipeline=view_item_pipeline
 {
   "log": "127.0.0.1 2017-04-12T09:02:34.234+07:00 USA 2 Web "
 }


 POST chapter5/log/?pipeline=view_item_pipeline
 {
   "log": "127.0.0.1 2017-04-13T09:02:34.234+07:00 USA 3 Web "
 }


 POST chapter5/log/?pipeline=view_item_pipeline
 {
   "log": "127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web "
 }

Давайте посмотрим на ответ одного из предыдущих запросов:

{
   "_index": "viewitem_2017_04",
   "_type": "log",
   "_id": "AVyeUsnSiOAxqKpNGiK7",
   "_version": 1,
   "result": "created",
   "_shards": {
     "total": 2,
     "successful": 1,
     "failed": 0
   },
   "created": true
 }

Из предыдущего ответа вы можете видеть, что документ попал в viewitem_2017_04 индекс, хотя исходный запрос обращался в chapter5 индекс. Мы можем получить документ с помощью GET API, как показано ниже:

GET viewitem_2017_04/log/AVyeUsnSiOAxqKpNGiK7

Сообщение журнала «127.0.0.1 2017-04-11T09:02:34.234+07:00 USA 1 Web» преобразуется в документ, показанный ниже:

{
   "_index": "viewitem_2017_04",
   "_type": "log",
   "_id": "AVyeUsnSiOAxqKpNGiK7",
   "_version": 1,
   "found": true,
   "_source": {
     "country": "USA",
     "itemId": "1",
     "client": "127.0.0.1",
     "platform": "Web",
     "timestamp": "2017-04-11T09:02:34.234+07:00"
   }
 }

Затем мы можем использовать viewitem индекс для агрегирования, чтобы получить пятерку наиболее просматриваемых элементов.

Процессор Grok по умолчанию понимает все поля как текст. Вы можете определить шаблон индекса с сопоставлениями, если вы хотите изменить поведение по умолчанию

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *