Size: a a a

2019 November 20

IK

Ivan Klass in pro.kafka
Всем привет. Неожиданно выяснили на практике (P1 на проде), что KTable.toStream может скипать промежуточные апдейты - как-то можно от этого уйти?
источник

IK

Ivan Klass in pro.kafka
В смысле, чтобы после каждого выполнения .aggregate в стриме из таблицы гарантировано был event
источник
2019 November 21

АЕ

Александр Ефимов in pro.kafka
Всем привет! Прошу помощи.
Получаем вот такую ошибку при записи сообщений в топик
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for paymentOrderIn-9: 35116 ms has passed since batch creation plus linger time
На сколько я понял, погуглив, что это значит что сообщений поступает больше чем продюсер может их отправить прямо сейчас и поэтому складывает в буфер и потом разом их отправляет?
Но из исключения видно что всего 3 рекорда Expiring 3 record(s).
Параметры request.timeout.ms и batch-size не переопределяли и значит они у нас дефолтные.

Правильно ли я понимаю, что в таком случае (если всетаки нужно буферизировать), продюсер ждет request.timeout.ms и если за это время сообщений меньше чем batch-size то генерится это исключение?
источник

АЕ

Александр Ефимов in pro.kafka
Producer это java приложение
источник

АЕ

Александр Ефимов in pro.kafka
используем
<dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>1.1.1</version>
       </dependency>
источник

АЕ

Александр Ефимов in pro.kafka
Все таки глянул в исходники:)
private static final String BATCH_SIZE_DOC = "...
...
This configuration controls the default batch size in bytes.
..."

Блин, это же не количество сообщений в пакете, а размер в байтах.
Тогда batch.size можно уменьшить
источник

IR

Ivan Rasikhin in pro.kafka
Александр Ефимов
Все таки глянул в исходники:)
private static final String BATCH_SIZE_DOC = "...
...
This configuration controls the default batch size in bytes.
..."

Блин, это же не количество сообщений в пакете, а размер в байтах.
Тогда batch.size можно уменьшить
Я бы ещё посоветовал выставить linger.ms в 5-10 чтобы гарантированно отправлять неполные батчи
источник

АЕ

Александр Ефимов in pro.kafka
Спасибо, попробуем
источник

IK

Ivan Klass in pro.kafka
Ivan Klass
В смысле, чтобы после каждого выполнения .aggregate в стриме из таблицы гарантировано был event
Помогло materialized.withCachingDisabled(). Поведение описано в доке:
https://kafka.apache.org/23/documentation/streams/developer-guide/memory-mgmt.html#id1
источник

AD

Aleksey Dobrunov in pro.kafka
подскажите, если в рамках батча кидаем в кафку 5 сообщений, которые попадут в одну партицию , гарантируется ли что они будут последовательно идти по офсетам, без разрывов. или на стороне кафки параллельный запрос может вставить в середину данные ?
источник

A

Alex in pro.kafka
по идее разрывов не должно быть, но как вы гарантируете что запись прошла батчем?
источник

A

Alex in pro.kafka
запихиваете list и flush?
так как если добавляете по одному, а потом flush то гарантий таких у вас нету, внутренний флаш по таймеру или размеру буфера у вас может и раньше случится
источник

AD

Aleksey Dobrunov in pro.kafka
вопрос пока гипотетический. понял, значит пытаться сделать такое не стоит.  спасибо
источник

IR

Ivan Rasikhin in pro.kafka
Aleksey Dobrunov
подскажите, если в рамках батча кидаем в кафку 5 сообщений, которые попадут в одну партицию , гарантируется ли что они будут последовательно идти по офсетам, без разрывов. или на стороне кафки параллельный запрос может вставить в середину данные ?
батчи параллельно в партицию не летят из одного продюсера
источник

AD

Aleksey Dobrunov in pro.kafka
я про разные продюсеры, конечно же. если у них ключ совпадет и данные в одну партицию прилетят
источник

AD

Aleksey Dobrunov in pro.kafka
предыдудщий вопрос пошел от недопонимания работы с транзакциями. смотрите, я публикую 5 сообщений в транзакции. им присвоены офсеты  5,6,7,15,20. транзакцию я еще не подтвердил. параллельно другой продюсер записал 8-14, 16-19 . и подтвердил. получается 8-14/16-19 не будут доступны для чтения пока я не подтвержу транзакцию для 20, или она не отвалится по таймауту?
источник

SB

Sergey Bezrukov in pro.kafka
Aleksey Dobrunov
предыдудщий вопрос пошел от недопонимания работы с транзакциями. смотрите, я публикую 5 сообщений в транзакции. им присвоены офсеты  5,6,7,15,20. транзакцию я еще не подтвердил. параллельно другой продюсер записал 8-14, 16-19 . и подтвердил. получается 8-14/16-19 не будут доступны для чтения пока я не подтвержу транзакцию для 20, или она не отвалится по таймауту?
а оффсеты не в момент коммита присваиваются? не знаю как реально сделано, но это было бы логично, кмк
источник

N

Nikolay in pro.kafka
Офсеты они в файле же
источник

N

Nikolay in pro.kafka
Присваиваются перед непосредственной записью в лог файл
источник

N

Nikolay in pro.kafka
Aleksey Dobrunov
предыдудщий вопрос пошел от недопонимания работы с транзакциями. смотрите, я публикую 5 сообщений в транзакции. им присвоены офсеты  5,6,7,15,20. транзакцию я еще не подтвердил. параллельно другой продюсер записал 8-14, 16-19 . и подтвердил. получается 8-14/16-19 не будут доступны для чтения пока я не подтвержу транзакцию для 20, или она не отвалится по таймауту?
А какая семантика чтения ?
источник