Size: a a a

2020 September 30

VG

Vik Gamov in pro.kafka
Это их фреймворк делает routing. Kafka Streams так не может из коробки
источник

VA

Vektor AB in pro.kafka
А по фреймворку может кто подсказать? У меня в конфиге все то же самое, за исключением, что используется только Function. На выход берет адреса брокеров из kafka1 биндера, а должен из kafka2.
источник

IT

Ivan Torgashov in pro.kafka
Vik Gamov
А почему не коннектор?
rdkafka не умеет коннектор, соответственно и гошка тоже
источник

t

terancet in pro.kafka
Alexey Melchakov
Никак не пойму как обрабатывать такие случаи с KafkaStreams.
(Spring cloud streams. KafkaStreams binder.)

Если при обработке стрима получаем исключение, то стрим останавливается. Ок, можно обернуть каждый шаг в try.. catch. Но как быть если ошибка возникла при сохранении в statestore промежуточного результата join,  например. При этом при перезапуске приложения оно снова читает это  же сообщение и опять валится.

Главный вопрос, как сдвинуть offset консьюмеров или закоммитить это сообщение при получении такой ошибки, ведь получается один poison message может остановить обработку всего потока сообщений.

У меня возникла ошибка при сераилизации в Avro, в io.confluent.kafka.serializers.KafkaAvroSerializer
Стектрейс по ссылке

https://pastebin.com/0qsfuFmg
Мне кажется, что Spring Cloud Stream Kafka Binder не умеет из коробки обрабатывать такие кейсы.

Вот цитата из документации
«It continues to remain hard to robust error handling using the high-level DSL; Kafka Streams doesn’t natively support error handling yet.»

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.2.1.RELEASE/spring-cloud-stream-binder-kafka.html#_error_handling
источник

AM

Alexey Melchakov in pro.kafka
Но я вот не пойму даже как это можно сделать только с помощью кафка стримс. Можно задать три обработчика ошибок. DeserializationExceptionHandler, ProductionExceptionHandler и UncaightExceptionHandler. В первые два я не попадаю, а третий когда ловит, то в нем есть только возможность сделать stream.stop(); System.exit() или stream.start(), но после перезапуска обработка начинается с того же оффсета.
источник
2020 October 01

SP

Sergey Pichkurov in pro.kafka
так из-за чего, собственно, ошибка происходит? если первая исключение происходит в коде приложения, то его надо как-то обрабатывать самому
источник

SP

Sergey Pichkurov in pro.kafka
кто-то юзал  MM2 на предмет миграции Kafka Streams приложений?
например, такой use case:
1) имеем X KS приложений, которые надо поочередно мигрировать из кластера A в кластер В
2) для каждого приложения:
2.1) зераклим входные и внутрение топики из А в В, где они получают новые имена (Tn => B.Tn).
полагаемся, что офсеты консюмеров зеркалятся автоматически.
2.2) в В создаем выходные топики B.TO и зеркалим их обратно в A (B.TOn => TOn). Здесь возникает вопрос что префикс A на обратном зеркалировании надо убирать, вроде можно через конфиги ?
2.3) останавливаем наш апп в А и рестартуем в В. тут другой вопрос - как менять конфигурацию для В что бы мы подхватили новые имена топиков (с префиксами) и реплицировнные офсеты консюмеров - и то и другое зависит от application.id  
3) в принципе, для 2.1 можно реплицировать без префикса (как для 2.2). так, наверное, будет даже проще.  это уже не будет active-active, но в моем случае это не так важно. к сожалению, не смог найти какой-либо конткретики для мигации KS приложений, в основном все про low level. непонятно почему нигде не обсуждается специфика KS, тк ньюансы и вопросы (например, то что KS приложение это Producer и Consumer в одном флаконе, и разделить их никак)  очевидны ...
источник

AM

Alexey Melchakov in pro.kafka
Sergey Pichkurov
так из-за чего, собственно, ошибка происходит? если первая исключение происходит в коде приложения, то его надо как-то обрабатывать самому
Вот в том то и проблема, что ошибка при сериализации во внутренний стейтстор при выполнении join.
источник

SP

Sergey Pichkurov in pro.kafka
а конкретнее, почему возникает ошибка, разбирались ? возможная причина - несовместимое изменение формата результата, если так то это надо решать репроцессингом
источник

AM

Alexey Melchakov in pro.kafka
Sergey Pichkurov
а конкретнее, почему возникает ошибка, разбирались ? возможная причина - несовместимое изменение формата результата, если так то это надо решать репроцессингом
да, причина ошибки ясна. конкретно это попытка сеарелизовать BigDecimal co scale=0 в авро тип, который требует scale=2.

Понятно, что это нужно чинить и ловить на тестах.

Но  вопрос в том, как защитить себя от такой ошибки на проде, при которой из-за одного битого сообщения возможна ситуация, когда приложение полностью блокируется и нет возмжности  продолжить его работу из-за того что оно не может пропустить такое сообщение.
источник

OG

Oleg Gavrilov in pro.kafka
Тесты серде в авро и обратно, перед деплоем?
источник

SP

Sergey Pimenov in pro.kafka
Oleg Gavrilov
Тесты серде в авро и обратно, перед деплоем?
Не отловил все комбинации данных и из за одного сообщеня откат на проме?)
источник

OG

Oleg Gavrilov in pro.kafka
Критикуешь-предлагай))
источник

OG

Oleg Gavrilov in pro.kafka
Как вариант обязательные ревью схемы данных
источник

OG

Oleg Gavrilov in pro.kafka
Кто-то же решил scale=0 поставить
источник

SP

Sergey Pimenov in pro.kafka
Это все костыли и косвенные вещи, проблему они не решают, а откладывают
источник

AM

Alexey Melchakov in pro.kafka
Да верно. но ведь тесты не 100 %   гарантия  что не будет ошибки.


Например, если мы пишем  рест сервис и прилетел запрос с некорректным форматом. Ок - отбили его и работаем дальше. А как быть со стримами
источник

SP

Sergey Pimenov in pro.kafka
Я бы предложил, но мы с Лехой в одной команде 😁😁
источник

VB

Vadim Boldyrev in pro.kafka
Sergey Pimenov
Это все костыли и косвенные вещи, проблему они не решают, а откладывают
Нужно пропускать такие сообщения? Или как?
источник

SP

Sergey Pimenov in pro.kafka
Хотя бы
источник