Используйте базовую конфигурацию консьюмера с дополнительными свойствами, подходящими для конкретных случаев использования.
При настройке консьюмеров вы должны убедиться, что они эффективно справляются с объемом поступающих данных. Как и при настройке производителя, будьте готовы вносить постепенные изменения до тех пор, пока потребители не будут работать так, как ожидается.
Базовая конфигурация консьюмера
Свойства подключения и десериализатора необходимы для каждого консьюмера. Как правило, хорошей практикой является добавление идентификатора клиента для отслеживания.
В конфигурации консьюмера, независимо от любой последующей конфигурации:
- Консьюмер получает сообщения с заданного смещения и потребляет их по порядку, если только смещение не было изменено для пропуска или повторного чтения сообщений.
- Брокер не знает, обработал ли потребитель ответы, даже при фиксации смещений в Kafka, поскольку смещения могут быть отправлены другому брокеру в кластере.
Основные свойства конфигурации консьюмера
1 2 3 4 5 6 7 | # ... bootstrap.servers=localhost:9092 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer client.id=my-client group.id=my-group-id # ... |
Где:
- bootstrap.servers (Требуется) Указывает потребителю подключаться к кластеру Kafka, используя адрес сервера загрузки host:port для брокера Kafka. Потребитель использует этот адрес для обнаружения и подключения ко всем брокерам в кластере. Используйте список, разделенный запятыми, чтобы указать два или три адреса на случай, если один из серверов не работает, но список всех брокеров в кластере предоставлять не обязательно. Если для открытия кластера Kafka используется служба балансировщика нагрузки, вам нужен только адрес службы, поскольку доступность будет обеспечиваться балансировщиком нагрузки.
- key.deserializer (Обязательно) Десериализатор для преобразования байтов, получаемых от брокера Kafka, в ключи сообщений.
- value.deserializer (Требуется) Десериализатор для преобразования байтов, полученных от брокера Kafka, в значения сообщений.
- client.id (Необязательно) Логическое имя клиента, которое используется в журналах и метриках для идентификации источника запроса. Этот идентификатор также может использоваться для дросселирования Консьюмера на основе квот времени обработки.
- group.id (Условно) Идентификатор группы необходим для того, чтобы потребитель мог присоединиться к группе потребителей.
Масштабирование потребления данных с помощью групп потребителей
Группы потребителей совместно используют обычно большой поток данных, генерируемый одним или несколькими производителями по заданной теме. Консьюмеры объединяются в группы с помощью свойства group.id, что позволяет распределять сообщения между членами группы. Один из консьюмеров в группе избирается лидером и решает, как назначить разделы потребителям в группе. Каждый раздел может быть назначен только одному потребителю.
Если у вас уже не так много потребителей, как разделов, вы можете масштабировать потребление данных, добавив больше экземпляров потребителей с тем же group.id. Добавление в группу большего количества потребителей, чем разделов, не поможет увеличить пропускную способность, но это означает, что в случае прекращения работы одного из потребителей он будет находиться в режиме ожидания. Если вы можете достичь пропускной способности с меньшим количеством потребителей, вы экономите ресурсы.
Консьюмеры в одной группе потребителей отправляют смещенные фиксации и сердцебиения одному и тому же брокеру. Поэтому чем больше консьюмеров в группе, тем выше нагрузка на брокер.
1 | group.id=my-group-id |
Добавление консьюмера в группу потребителей с использованием идентификатора группы.
Гарантии упорядочивания сообщений
Брокеры Kafka получают запросы на выборку от потребителей, которые просят брокера отправить сообщения из списка тем, разделов и позиций смещения.
Консьюмер наблюдает сообщения в одном разделе в том же порядке, в котором они были переданы брокеру, что означает, что Kafka предоставляет гарантии упорядочивания только для сообщений в одном разделе. И наоборот, если консьюмер потребляет сообщения из нескольких разделов, порядок сообщений в разных разделах, наблюдаемый потребителем, не обязательно отражает порядок, в котором они были отправлены.
Если вам нужно строгое упорядочивание сообщений из одной темы, используйте один раздел для каждого потребителя.
Оптимизация консьюмеров по пропускной способности и задержке
Контролируйте количество сообщений, возвращаемых при вызове вашим клиентским приложением функции KafkaConsumer.poll().
Используйте свойства fetch.max.wait.ms и fetch.min.bytes, чтобы увеличить минимальный объем данных, получаемых потребителем от брокера Kafka. Пакетирование по времени настраивается с помощью fetch.max.wait.ms, а пакетирование по размеру - с помощью fetch.min.bytes.
Если загрузка процессора потребителя или брокера высока, это может быть связано со слишком большим количеством запросов от потребителя. Вы можете настроить свойства fetch.max.wait.ms и fetch.min.bytes таким образом, чтобы количество запросов было меньше, а сообщения доставлялись большими партиями. При более высокой настройке повышается пропускная способность с некоторым ущербом для задержки. Вы также можете поднять эти параметры, если количество получаемых данных невелико.
Например, если вы установите fetch.max.wait.ms на 500 мс, а fetch.min.bytes на 16384 байта, когда Kafka получит запрос на выборку от Консьюмера, она ответит, когда будет достигнут первый из этих порогов.
И наоборот, вы можете уменьшить значения свойств fetch.max.wait.ms и fetch.min.bytes, чтобы увеличить сквозную задержку.
1 2 | fetch.max.wait.ms=500 fetch.min.bytes=16384 |
- fetch.max.wait.ms - Максимальное время в миллисекундах, которое брокер будет ждать перед выполнением запросов на выборку. По умолчанию установлено значение 500 миллисекунд.
- fetch.min.bytes - Если используется минимальный размер пакета в байтах, запрос будет отправлен, когда будет достигнут минимальный размер или сообщения будут находиться в очереди дольше, чем fetch.max.wait.ms (в зависимости от того, что наступит раньше). Добавление задержки позволяет партиям накапливать сообщения до размера партии.
Снижение задержки за счет увеличения размера запроса на выборку
Используйте свойства fetch.max.bytes и max.partition.fetch.bytes, чтобы увеличить максимальный объем данных, получаемых консьюмером от брокера Kafka.
Свойство fetch.max.bytes устанавливает максимальное ограничение в байтах на объем данных, получаемых от брокера за один раз.
Свойство max.partition.fetch.bytes устанавливает максимальное ограничение в байтах на количество возвращаемых данных для каждого раздела, которое всегда должно быть больше, чем количество байт, установленное в конфигурации брокера или темы для max.message.bytes.
Максимальный объем памяти, который может потреблять клиент, рассчитывается примерно так:
1 | NUMBER-OF-BROKERS * fetch.max.bytes и NUMBER-OF-PARTITIONS * max.partition.fetch.bytes. |
Если объем памяти позволяет, вы можете увеличить значения этих двух свойств. Благодаря увеличению количества данных в каждом запросе улучшается задержка, так как уменьшается количество запросов на выборку.
1 2 3 4 | # ... fetch.max.bytes=52428800 1 max.partition.fetch.bytes=1048576 2 # ... |
Где:
- fetch.max.bytes - Максимальный объем данных в байтах, возвращаемых по запросу fetch.
- max.partition.fetch.bytes - Максимальный объем данных в байтах, возвращаемых для каждого раздела.
Предотвращение потери или дублирования данных при фиксации смещений
Механизм автокоммита Kafka позволяет потребителю автоматически фиксировать смещения сообщений. Если этот механизм включен, потребитель будет фиксировать смещения, полученные в результате опроса брокера, с интервалом 5000 мс.
Механизм автокоммита удобен, но он создает риск потери и дублирования данных. Если консьюмер получил и преобразовал некоторое количество сообщений, но при выполнении автокоммита система падает с обработанными сообщениями в буфере консьюмера, эти данные будут потеряны. Если система падает после обработки сообщений, но до выполнения автокоммита, данные дублируются на другом экземпляре потребителя после ребалансировки.
Автокоммит может избежать потери данных только в том случае, если все сообщения будут обработаны до следующего опроса брокера или закрытия консьюмера.
Чтобы свести к минимуму вероятность потери или дублирования данных, вы можете установить значение enable.auto.commit на false и разработать клиентское приложение, чтобы иметь больше контроля над смещениями фиксации. Или вы можете использовать auto.commit.interval.ms, чтобы уменьшить интервалы между фиксациями.
1 2 3 | # ... enable.auto.commit=false 1 # ... |
enable.auto.commit - Значение auto commit установлено в false, чтобы обеспечить больший контроль над смещениями фиксации.
Установив значение enable.auto.commit в false, вы сможете фиксировать смещения после того, как вся обработка будет выполнена и сообщение будет потреблено. Например, вы можете настроить свое приложение на вызов API Kafka commitSync и commitAsync.
API commitSync фиксирует смещения в пакете сообщений, возвращенных в результате опроса. Вы вызываете этот API, когда закончили обработку всех сообщений в пакете. Если вы используете API commitSync, приложение не будет опрашивать новые сообщения до тех пор, пока не будет зафиксировано последнее смещение в пакете. Если это негативно сказывается на пропускной способности, вы можете фиксировать сообщения реже или использовать API commitAsync. API commitAsync не ждет ответа брокера на запрос фиксации, но рискует создать больше дубликатов при ребалансировке. Обычно в приложении сочетаются оба API фиксации, причем API commitSync используется непосредственно перед выключением Консьюмера или ребалансировкой, чтобы убедиться, что финальная фиксация прошла успешно.
Управление транзакционными сообщениями
Рассмотрите возможность использования транзакционных идентификаторов и включения idempotence (enable.idempotence=true) на стороне производителя, чтобы гарантировать доставку точно в один раз. На стороне потребителя вы можете использовать свойство isolation.level, чтобы контролировать чтение транзакционных сообщений потребителем.
Свойство isolation.level имеет два допустимых значения:
- read_committed
- read_uncommitted (по умолчанию)
Используйте read_committed, чтобы убедиться, что потребитель читает только транзакционные сообщения, которые были зафиксированы. Однако это приведет к увеличению сквозной задержки, поскольку потребитель не сможет вернуть сообщение, пока брокеры не запишут маркеры транзакций, которые фиксируют результат транзакции (зафиксирована или прервана).
1 2 3 4 | # ... enable.auto.commit=false isolation.level=read_committed 1 # ... |
isolation.level - Установите значение read_committed, чтобы консьюмер читал только зафиксированные сообщения.
Восстановление после сбоя во избежание потери данных
Используйте свойства session.timeout.ms и heartbeat.interval.ms, чтобы настроить время, необходимое для проверки и восстановления после отказа потребителей в группе потребителей.
Свойство session.timeout.ms определяет максимальное время в миллисекундах, в течение которого потребитель в группе потребителей может находиться вне связи с брокером, прежде чем он будет признан неактивным и будет запущена ребалансировка между активными потребителями в группе. Когда группа перебалансируется, разделы переназначаются членам группы.
Свойство heartbeat.interval.ms задает интервал в миллисекундах между проверками сердцебиения координатору группы потребителей, чтобы показать, что потребитель активен и подключен. Интервал сердцебиения должен быть меньше, обычно на треть, чем интервал таймаута сессии.
Если установить свойство session.timeout.ms меньше, то сбойные потребители будут обнаружены раньше, и ребалансировка может быть проведена быстрее. Однако следите за тем, чтобы таймаут не был настолько мал, что брокер не сможет вовремя получить сердечный сигнал и запустит ненужную ребалансировку.
Уменьшение интервала сердцебиения снижает вероятность случайной ребалансировки, но более частые сердцебиения увеличивают нагрузку на ресурсы брокера.
Управление политикой смещения
Используйте свойство auto.offset.reset для управления поведением консьюмера, если смещения не были зафиксированы, или зафиксированное смещение больше не действует или удалено.
Предположим, вы впервые развертываете потребительское приложение, и оно читает сообщения из существующей темы. Поскольку group.id используется впервые, тема __consumer_offsets не содержит никакой информации о смещениях для этого приложения. Новое приложение может начать обработку всех существующих сообщений с начала журнала или только новых сообщений. По умолчанию используется значение сброса latest, которое начинается с конца раздела, и, следовательно, некоторые сообщения будут пропущены. Чтобы избежать потери данных, но увеличить объем обработки, установите значение auto.offset.reset на earliest, чтобы начинать с начала раздела.
Также рассмотрите возможность использования самого раннего варианта, чтобы избежать потери сообщений после окончания периода хранения смещений (offsets.retention.minutes), настроенного для брокера. Если группа потребителей или отдельный потребитель неактивны и не фиксируют смещения в течение периода хранения, ранее зафиксированные смещения удаляются из __consumer_offsets.
1 2 3 4 5 | # ... heartbeat.interval.ms=3000 1 session.timeout.ms=45000 2 auto.offset.reset=earliest 3 # ... |
Где:
- heartbeat.interval.ms - Настройте меньший интервал сердцебиения в соответствии с ожидаемыми ребалансами.
- session.timeout.ms - Если брокер Kafka не получит ни одного сердцебиения до истечения таймаута, консьюмер будет удален из группы потребителей и будет инициирована ребалансировка. Если в конфигурации брокера указаны значения group.min.session.timeout.ms и group.max.session.timeout.ms, значение тайм-аута сессии должно находиться в этом диапазоне.
- auto.offset.reset- Установите значение earliest, чтобы вернуться к началу раздела и избежать потери данных, если смещения не были зафиксированы.
Если объем данных, возвращаемых в одном запросе на выборку, велик, может произойти тайм-аут до того, как консьюмер обработает их. В этом случае можно уменьшить max.partition.fetch.bytes или увеличить session.timeout.ms.
Минимизация влияния перебалансировки
Ребалансировка раздела между активными потребителями в группе - это время, которое требуется для:
- Консьюмеры зафиксировали свои смещения
- Формирование новой группы потребителей
- Лидер группы назначает разделы членам группы
- Консьюмеры в группе получают свои назначения и начинают выборку
Очевидно, что этот процесс увеличивает время простоя службы, особенно если он повторяется во время скользящего перезапуска кластера групп потребителей.
В этой ситуации можно использовать концепцию статического членства, чтобы сократить количество перебалансировок. При перебалансировке разделы тем равномерно распределяются между членами группы потребителей. Статическое членство использует постоянство, чтобы экземпляр потребителя был распознан во время перезапуска после таймаута сеанса.
Координатор группы потребителей может идентифицировать новый экземпляр потребителя с помощью уникального идентификатора, который задается с помощью свойства group.instance.id. Во время перезапуска потребителю присваивается новый идентификатор члена, но как статический член он продолжает использовать тот же идентификатор экземпляра, и назначение тематических разделов происходит так же.
Если приложение-потребитель не выполняет вызов опроса хотя бы каждые max.poll.interval.ms миллисекунд, потребитель считается вышедшим из строя, что приводит к ребалансировке. Если приложение не успевает обработать все записи, полученные в результате опроса, можно избежать ребалансировки, используя свойство max.poll.interval.ms для указания интервала в миллисекундах между опросами новых сообщений от консьюмера. Или вы можете использовать свойство max.poll.records, чтобы установить максимальное ограничение на количество записей, возвращаемых из буфера потребителя, что позволит вашему приложению обрабатывать меньшее количество записей в рамках ограничения max.poll.interval.ms.
1 2 3 4 5 | # ... group.instance.id=UNIQUE-ID max.poll.interval.ms=300000 max.poll.records=500 # ... |
Где:
- group.instance.id - Уникальный идентификатор экземпляра гарантирует, что новый экземпляр потребителя получит то же самое назначение тематических разделов.
- max.poll.interval.ms - Задайте интервал для проверки того, что консьюмер продолжает обрабатывать сообщения.
- max.poll.records - Устанавливает количество обработанных записей, возвращаемых потребителем.