Поток данных Elasticsearch - это уровень абстракции между именами, используемыми приложениями для облегчения операций ввода и поиска данных, и базовыми индексами, используемыми Elasticsearch для хранения этих данных. Потоки данных позволяют хранить данные временных рядов только в виде приложений в нескольких индексах, предоставляя при этом единый именованный ресурс для запросов.
Данные, отправленные в поток данных, хранятся в индексах с именем в следующем формате:
1 | .ds-<имя потока данных>-<yyy.MM.dd>-<номер поколения> |
Дата - это дата создания индекса (не путать с ежедневными индексами).
Номер поколения - это порядковый номер, который увеличивается на единицу при каждом переходе индекса.
1 | .ds-mylogs-2024.08.01-00002 |
Помимо этого, поток данных работает по большей части так же, как и обычный индекс, с большинством стандартных команд Elasticsearch (с некоторыми ограничениями, которые будут описаны далее в этой статье).
Варианты использования потоков данных
Потоки данных используются для таких вещей, как журналы, события и метрики. Ключевыми особенностями потоков данных являются:
- Данные, основанные на времени
- Очень редко обновляются (в идеале - никогда)
Различия между потоком данных и обычным индексом
Поток данных в основном работает так же, как и обычный индекс, с большинством стандартных команд Elasticsearch. Однако к потокам данных применимы следующие ограничения:
- Поток данных является уровнем абстракции - данные хранятся в нижележащих индексах .ds.
- Они должны содержать поле @timestamp, отображенное как дата или date_nanos.
- Удаление и обновление потоков может осуществляться только с помощью API "_update_by_query" или "_delete_by_query".
- Вы не можете создавать документы непосредственно в базовом индексе .ds, хотя вы можете обновлять или удалять данные непосредственно в индексе .ds.
- Вы не можете выполнять следующие операции с базовыми индексами .ds: клонировать, закрывать, удалять, замораживать, сокращать и разделять.
Как создать поток данных
Создайте политику ILM
Политика ILM будет управлять базовыми индексами. В данном случае мы определим, что индекс должен сворачиваться, когда размер шарда достигнет 50 ГБ, и что индекс будет удален, когда его возраст достигнет 60 дней.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | PUT /_ilm/policy/my-policy-delete60d { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_size": "50GB" } } }, "delete": { "min_age": "60d", "actions": { "delete": {} } } } } } |
Создайте шаблон индекса для потока данных
Приведенный ниже шаблон определяет, что мы будем использовать только что созданную политику жизненного цикла.
Объект "data_stream":{} указывает на то, что это поток данных, а не обычный индекс. Здесь также можно добавить мапинги индексов и другие настройки, как и для обычного индекса.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | PUT /_index_template/my-logs-data-stream { "index_patterns": [ "logs-data-stream*" ], "data_stream": { }, "priority": 500, "template": { "settings": { "index.lifecycle.name": "my-policy-delete60d" }, "mappings":{ "properties": { "@timestamp":{ "type":"date" } } } } } |
Создание потока данных / индексирование
При отправке данных в имя потока данных поток данных будет создан автоматически с использованием спецификаций в шаблоне.
1 2 3 4 5 6 | POST /logs-data-stream-test/_doc/ { "@timestamp": "2024-04-07T12:02:15.000Z", "message": "Hello world" } |
Обратите внимание, что команда индексирования точно такая же, как и для обычного индекса. Именно шаблон индекса указывает Elasticsearch на создание потока данных, а не обычного индекса.
Для индексирования используется стандартный API массового индексирования, но помните - обновления и удаления не допускаются. Обратите внимание, что вместо PUT <index>/_doc/_id вы можете использовать следующий синтаксис для установки идентификатора документа:
1 2 3 4 5 6 | PUT /logs-data-stream-test/_create/my-id { "@timestamp": "2021-04-07T12:02:15.000Z", "message": "I just set the ID of this document" } |
Работа с потоками данных
Поиск в потоке данных
Нет никакой разницы между поиском в потоке данных и в обычном индексе.
Если вы используете поток данных, он будет искать во всех индексах подложки - то есть во всех данных, присутствующих в потоке данных.
1 2 3 4 5 6 7 8 | GET /logs*/_search { "query": { "match": { "message": "hello" } } } |
Удаление данных
Рекомендуемый способ удаления данных из потока данных - это политика ILM. Это означает, что индексы старше определенного возраста автоматически удаляются. Однако следует помнить, что дата хранения зависит от возраста индекса, поэтому индекс, созданный 60 дней назад, может содержать гораздо более свежие данные.
Можно удалять отдельные документы или наборы документов с помощью _delete_by_query. Этот запрос одинаков как для потоков данных, так и для обычных индексов.
1 2 3 4 5 6 7 8 | POST /logs*/_delete_by_query { "query": { "match": { "log_type": "alpha" } } } |
Также можно удалить весь поток данных и все базовые индексы:
1 | DELETE /_data_stream/logs-data-stream-test |
Если вы знаете имя базового индекса и ID документа, вы можете удалить документы из базового индекса следующим образом:
1 | DELETE /.ds-logs-data-stream-2024.04.18-000002/_doc/dfdpvnfBt7VVZ |
Обновление данных
Потоки данных не предназначены для случаев, когда требуется регулярно обновлять данные. Однако при необходимости вы можете обновить документы из базового индекса, выполнив следующие действия.
Сначала получите порядковый номер, номер первичного термина, идентификатор и базовый индекс удаляемого документа:
1 2 3 4 5 6 7 8 9 | GET /logs-data-stream/_search { "seq_no_primary_term": true, "query": { "match": { "id": "abc123" } } } |
Затем используйте все четыре элемента из вывода вышеуказанного запроса, чтобы выполнить следующее:
1 2 3 4 5 6 | PUT .ds-logs-data-stream-2024-04-03-000002/_doc/aasdfhghtRcc563?if_seq_no=0&if_primary_term=1 { "@timestamp": "2024-03-07T12:02:07.000Z", "id": "abc123" "message": "I updated this document" } |
Обновление мапингов
Вы можете обновлять динамические настройки или добавлять новые поля в поток данных аналогично тому, как вы делаете это в обычном индексе. Если поле является новым, вы можете добавить его в шаблон индекса.
Однако, как и в случае с обычными индексами, вы не можете изменить типы отображения после создания потока данных. Если вы измените тип отображения для существующего поля в шаблоне, то в результате получите два разных типа отображения (конфликт отображений) для одного и того же поля в потоке данных. Это сделает невозможным поиск в потоке данных по этому полю. Если вам нужно изменить тип отображения для существующего поля, то вам придется создать новый поток данных с соответствующими отображениями и заново проиндексировать весь поток данных.
Другие полезные команды
Чтобы увидеть базовые индексы потока данных, вы можете использовать эту команду:
1 | GET _data_stream/my-data-stream |