Как использовать конвейеры ввода для прозрачного преобразования данных в OpenSearch

Вы хотите внести некоторые изменения в свои данные, но при этом хотели бы использовать более легкий метод, чем Logstash или другой инструмент для разбора данных? Конвейеры ввода данных могут оказаться именно тем, что вы ищете.

Для чего используется ingest pipeline

С помощью конвейеров ввода вы можете манипулировать данными в соответствии с вашими потребностями без особых накладных расходов. Конвейеры ввода располагаются внутри узла OpenSearch (узла ввода, если вы его определили) и выполняют определенный набор изменений в ваших данных, которые вы определяете. Эти изменения называются "процессорами", и каждый из них выполняет для вас определенную задачу. Набор таких процессоров называется конвейером ввода и, будучи определенным, может использоваться для манипулирования данными при их вводе в систему.

В качестве примера рассмотрим следующую структуру данных:

С помощью конвейеров ввода можно изменять указанные выше данные следующим образом:

  • Переименовать поля: Например, изменить "first_name" на "firstName".
  • Удалить поля: Например, удалить поле "email".
  • Разделить поля, чтобы превратить значение в массив, используя разделитель, а не строку: Например, превратить "activities" из "Soccer, Cooking, Eating" в [ "Soccer","Cooking", "Eating"].
  • Выполнить поиск GeoIP по полю
  • Запуск скрипта для наибольшей гибкости (с простым синтаксисом): Например, сложение двух полей и деление на другое, или кодирование конфиденциальных данных.
  • Конвертировать поля: Например, изменить тип поля со строки на int.
  • Обогащать документы, выполняя поиск для добавления дополнительной информации к каждому событию: Т.е. Дополнительная информация ниже.

Если вы хотите импортировать данные в более сыром виде:

Вы также можете использовать конвейеры ввода данных для:

  • Разбор полей с помощью grok или dissect: Например, сохранить "2021-05-11..." в поле "дата"; "mytest.example.com" в поле "origin"; и "Похоже, возникла проблема" в "raw_message".
  • Разберите csv на поля: Т.е. во втором примере используйте запятую, разделитель и назовите первое значение "date"; второе "origin"; третье "raw_message".

Как создавать конвейеры ввода документов

Ввод документов осуществляется на узле OpenSearch, которому присвоена роль "ingest" (если вы не настроили узел на определенную роль, то он сможет вводить данные по умолчанию).

Вы можете создавать конвейеры ввода и затем определять конвейер, через который вы хотите пропустить данные:

  • Ваш массовый POST в OpenSearch, или
  • через шаблон индекса для определения конвейера, который будет использоваться для всех индексов, соответствующих определенному шаблону.
  • А также с помощью нескольких других методов, которые мы не будем рассматривать в этом руководстве, но которые также работают, например: Logstash, конечную точку _update_by_query и конечную точку _reindex.

Теперь давайте замажем руки и рассмотрим, как на самом деле создать конвейер ввода и как проверить его работоспособность. Если у вас открыта Kibana, перейдите в раздел Dev Tools на вкладке Management. (Вы также можете использовать новый пользовательский интерфейс Ingest Node Pipeline в Kibana для более удобного создания конвейеров, перейдя в раздел "Stack Management" > "Ingest Node Pipelines").

Как протестировать конвейер

Для тестирования трубопровода перед его созданием можно воспользоваться конечной точкой "_simulate":

Вы можете добавлять один за другим различные процессоры и проверять, возвращается ли ожидаемый результат, используя примеры документов, которые вы предоставили в массиве "docs". Убедившись, что конвейер работает так, как нужно, можно создать конвейер с именем "test":

Не волнуйтесь, если вы допустили ошибку - вы всегда можете повторно отправить команду PUT, и она обновит конвейер.

Вы также можете удалить его полностью с помощью команды:

Как использовать процессор enrich

С помощью процессора enrich можно импортировать индекс, а затем использовать этот индекс для статического поиска во входящих данных с целью добавления дополнительных полей.

Например, если в ваш набор данных импортируются различные продукты питания, и вы хотите узнать, какого они цвета, вы можете импортировать индекс, который позволит вам "присоединиться" к полю "fruit" и импортировать связанные с ним поля.

Если ваш входящий источник данных выглядит следующим образом:

Если же вы хотите добавить цвет каждого из них, полученный из отдельного индекса, то можно выполнить следующие действия.

Сначала импортируйте данные поиска:

Затем создайте политику обогащения и выполните ее для построения индекса обогащения:

Теперь вы можете использовать политику обогащения "color_lookup" через процессор обогащения в любом конвейере.

В данном примере мы использовали одно поле обогащения, но вы можете добавить любое количество полей.

Следует помнить, что если вы хотите обновить индекс поиска, то вам необходимо обновить индекс новыми документами и повторно выполнить команду "_execute" для обновления поиска. Однако такое обновление будет **только для будущих данных, все предыдущие данные будут иметь прежние значения для поиска**.

Эти процессоры конвейера обогащения могут быть очень полезны для тех, кто хочет использовать отдельный источник данных для добавления значимой информации к поступающим данным. Можно добавить несколько таких процессоров или ограничиться одним.

Конвейеры ввода - это отличный легкий способ очистки и улучшения данных OpenSearch в соответствии с вашими потребностями.

Понравилась статья? Поделиться с друзьями:
Добавить комментарий