Size: a a a

2019 November 22

M

Mm in pro.kafka
Коллеги привет, есть ситуация - мы передаём данные из Кафки в другую систему. Бывают случае когда она недоступна, и мы накапливаем данные во внутреннем буфере. И чтобы буфера не распухали, в какой то момент надо остановить полл из кафки. Но при этом хочется чтобы координатор понимал, что мы не полим специально, и нас не надо в список мертвых относить. Вопрос следующий - можно ли посылать какие ниб хертбиты из консьюмера , чтобы брокер знал что мы живые., просто намеренно не забираем данные. Или может быть есть какое то другое решение такой проблемы. Спасибо
источник

IK

Ilia Khaustov in pro.kafka
Можно же забирать но не коммитить
источник

AM

Aset Madraimov in pro.kafka
Ayrat Hudaygulov
есть ли смысл 300КБ месаджи ещё и в батч упаковывать?
без батчинга высокого рейта не будет. 300КБ это норм. Надо тестить на каком размере батча лучший рейт
источник

M

Mm in pro.kafka
Ilia Khaustov
Можно же забирать но не коммитить
Можно , мы так и делаем. Память то не бесконечная, в какой то момент может кончится)
источник

AM

Aset Madraimov in pro.kafka
Mm
Можно , мы так и делаем. Память то не бесконечная, в какой то момент может кончится)
А крашиться до восстановления удаленной системы не вариант?
источник

IK

Ilia Khaustov in pro.kafka
Mm
Можно , мы так и делаем. Память то не бесконечная, в какой то момент может кончится)
Я так и не понял, зачем в принципе накапливать данные во внутреннем буфере консумера, когда есть кафка? Почему продюсеру нужен буфер - понятно, но тут как-то не очень. Потенциально правильный сценарий такой: внешняя система недоступна - продолжаем консумить но отключаем коммиты, все сообщения уходят в /dev/null и память не занимают; система доступна - начинаем коммитить, лаг по партициям уменьшается.
источник

M

Mm in pro.kafka
Aset Madraimov
А крашиться до восстановления удаленной системы не вариант?
В теории можно и в эту сторону подумать
источник

A

Alex in pro.kafka
там же есть даже suspend операция на партиции
источник

A

Alex in pro.kafka
ну и последние же кафки вроде как хертбит отдельно шлют
источник

A

Alex in pro.kafka
public void pause(Collection<TopicPartition> partitions)

Suspend fetching from the requested partitions. Future calls to poll(long) will not return any records from these partitions until they have been resumed using resume(Collection). Note that this method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.
источник

M

Mm in pro.kafka
Ilia Khaustov
Я так и не понял, зачем в принципе накапливать данные во внутреннем буфере консумера, когда есть кафка? Почему продюсеру нужен буфер - понятно, но тут как-то не очень. Потенциально правильный сценарий такой: внешняя система недоступна - продолжаем консумить но отключаем коммиты, все сообщения уходят в /dev/null и память не занимают; система доступна - начинаем коммитить, лаг по партициям уменьшается.
Консьюмер то свой локальный офсет имеет, и если мы так будем делать то куча сообщений вылетит в трубу, в какой то момент скажем коммит и окончательно их потеряем
источник

M

Mm in pro.kafka
Alex
ну и последние же кафки вроде как хертбит отдельно шлют
Вместе с poll
источник

M

Mm in pro.kafka
Alex
public void pause(Collection<TopicPartition> partitions)

Suspend fetching from the requested partitions. Future calls to poll(long) will not return any records from these partitions until they have been resumed using resume(Collection). Note that this method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.
Вот тоже что нашёл - похоже так и придётся
источник

AM

Aset Madraimov in pro.kafka
Mm
Консьюмер то свой локальный офсет имеет, и если мы так будем делать то куча сообщений вылетит в трубу, в какой то момент скажем коммит и окончательно их потеряем
В зависимости от того когда делается коммит, после успешной записи в удаленную систему, либо после того как вы получили сообщение в своем приложении. От этого зависит delivery semantics. Согласен с вариантом не коммитить
источник

A

Alex in pro.kafka
Mm
Вместе с poll
источник

A

Alex in pro.kafka
от он макс пул интервала только паузой рулить
источник

A

Alex in pro.kafka
то есть сейчас можно смело ставить настройки вида

session.timeout.ms: 10s
max.poll.interval.ms: 5min
max.poll.records: 500


и это будет работать
источник

A

Alex in pro.kafka
источник

A

Alex in pro.kafka
вот доп описание
источник

M

Mm in pro.kafka
Ага, все ясно. Но мою проблему вроде все равно не решает. Так как после того как выгорит max.poll.interval.ms консьюмер будет считаться мертвым - верно?
источник