Size: a a a

2020 February 20

IR

Ivan Rasikhin in pro.kafka
пока состояние не восстановлено стримы не стартуют
источник

IR

Ivan Rasikhin in pro.kafka
после того как состояние вычитано стримы начинают работу
источник

IR

Ivan Rasikhin in pro.kafka
с места последней остановки
источник

IR

Ivan Rasikhin in pro.kafka
поэтому есть иногда эффект на крупных топиках что приложение стартует долго
источник

IR

Ivan Rasikhin in pro.kafka
так как вычитывается топик в стор
источник

IR

Ivan Rasikhin in pro.kafka
ну это насколько я знаю стримы)
источник

IR

Ivan Rasikhin in pro.kafka
само собой когда я говорю про состояние я имею ввиду ktable/globalktable/store + changelog
источник

IR

Ivan Rasikhin in pro.kafka
Юрий Бадальянц
Ну вот у меня включена полная оптимизация, и почему-то такого не происходит. Пока я явно логирование не выключу, ченжлог создаётся
if (shouldReuseSourceTopicForChangelog) {
                   storeBuilder.withLoggingDisabled();
                   topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
               }
вот кусок кода из стримов который отключает логгирование при оптимизации
источник

IR

Ivan Rasikhin in pro.kafka
возможно вы создаете свой собственный Materialized и там явно включено логирование
источник

ЮБ

Юрий Бадальянц in pro.kafka
Я создаю свой собственный с выключенным логингом
источник

ЮБ

Юрий Бадальянц in pro.kafka
Ivan Rasikhin
if (shouldReuseSourceTopicForChangelog) {
                   storeBuilder.withLoggingDisabled();
                   topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
               }
вот кусок кода из стримов который отключает логгирование при оптимизации
Покурю исходники на тему проставления флага
источник

ЮБ

Юрий Бадальянц in pro.kafka
Какая-то мутная вообще тема
источник

IR

Ivan Rasikhin in pro.kafka
это да, я сколько не пытался хорошо разобрать исходники стримов, никак нормаьлно не получается)
источник

VG

Vik Gamov in pro.kafka
Ivan Ponomarev
Касательно тестирования стримов, @gamussa ! Я тут на днях обнаружил кейс, когда в TestDriver всё хорошо проходит, а в настоящем кластере не работает.
Оказывается, TestDriver не одинаково с настоящим кластером прописывает headers, и десериализатор, который учитывает headers, может отлично тебе показать зелёные тесты в TestDriver, но абсолютно дико повести себя в настоящем кластере. Если нужно, дарю тебе этот пример для доклада, все подробности могу рассказать
Давай!
источник

IP

Ivan Ponomarev in pro.kafka
Vik Gamov
Давай!
Рассказываю

Тестировалась топология, включающая в себя операцию merge, в которую приходили данные из двух разных топиков, данные в одном из которых приводились к нужному типу через mapValues. На TopologyTestDriver построили исчерпывающий набор тестов, всё было ОК.

Запустили в dev environment -- полная чепуха. Не работает как надо.

Скачали из кафки дамп данных, пропустили их через TopologyTestDriver -- снова всё ОК, не воспроизводится проблема.

Пришлось отлаживаться на настоящей кафке, и оказалось, что использованный нами JSonSerde из Spring Kafka по умолчанию учитывает заголовки typeid при десериализации. Эти заголовки (если специально не запретить) имеют приоритет над targetType, переданным десериализатору при создании. А теперь внимание. В нашем случае (вероятнее всего, из-за глюков в реализации KStream.merge, но может и по иной причине) заголовков typeid было два, унаследованных из обоих топиков. Десериализатор брал первый попавшийся (неправильный) и делал неверную вещь.

Честно говоря, о важности заголовков я до сих пор не задумывался. И -- факт: в том что касается заголовков, TopologyTestDriver ведёт себя неизоморфно настоящему кластеру. Поэтому -- вывод для твоего доклада: тестдрайвер не отменяет теста на  TestContainers.
источник

EI

Eugene Ivlev in pro.kafka
Ivan Rasikhin
ktable на старте восстанавливает состояние из source топика
При выключенном changelog восстановления не будет. Только если global store. Обычный store с оффсета 0 не восстанавливает. Только подписывается на топик. Все новые данные будут попадать в store.
источник

VG

Vik Gamov in pro.kafka
Ivan Ponomarev
Рассказываю

Тестировалась топология, включающая в себя операцию merge, в которую приходили данные из двух разных топиков, данные в одном из которых приводились к нужному типу через mapValues. На TopologyTestDriver построили исчерпывающий набор тестов, всё было ОК.

Запустили в dev environment -- полная чепуха. Не работает как надо.

Скачали из кафки дамп данных, пропустили их через TopologyTestDriver -- снова всё ОК, не воспроизводится проблема.

Пришлось отлаживаться на настоящей кафке, и оказалось, что использованный нами JSonSerde из Spring Kafka по умолчанию учитывает заголовки typeid при десериализации. Эти заголовки (если специально не запретить) имеют приоритет над targetType, переданным десериализатору при создании. А теперь внимание. В нашем случае (вероятнее всего, из-за глюков в реализации KStream.merge, но может и по иной причине) заголовков typeid было два, унаследованных из обоих топиков. Десериализатор брал первый попавшийся (неправильный) и делал неверную вещь.

Честно говоря, о важности заголовков я до сих пор не задумывался. И -- факт: в том что касается заголовков, TopologyTestDriver ведёт себя неизоморфно настоящему кластеру. Поэтому -- вывод для твоего доклада: тестдрайвер не отменяет теста на  TestContainers.
Respect
источник

IR

Ivan Rasikhin in pro.kafka
Eugene Ivlev
При выключенном changelog восстановления не будет. Только если global store. Обычный store с оффсета 0 не восстанавливает. Только подписывается на топик. Все новые данные будут попадать в store.
спасибо за коммент, да - правда ktable сам по себе не восстанавливается, только если включена оптимизация либо логируется store
источник

EI

Eugene Ivlev in pro.kafka
А что за оптимизация? В processor api можно включить?
источник

IR

Ivan Rasikhin in pro.kafka
Properties properties = new Properties();
properties.put(TOPOLOGY_OPTIMIZATION, OPTIMIZE);
streamsBuilder.build(properties);

только для DSL
источник