Никак не пойму как обрабатывать такие случаи с KafkaStreams.
(Spring cloud streams. KafkaStreams binder.)
Если при обработке стрима получаем исключение, то стрим останавливается. Ок, можно обернуть каждый шаг в try.. catch. Но как быть если ошибка возникла при сохранении в statestore промежуточного результата join, например. При этом при перезапуске приложения оно снова читает это же сообщение и опять валится.
Главный вопрос, как сдвинуть offset консьюмеров или закоммитить это сообщение при получении такой ошибки, ведь получается один poison message может остановить обработку всего потока сообщений.
У меня возникла ошибка при сераилизации в Avro, в io.confluent.kafka.serializers.KafkaAvroSerializer
Стектрейс по ссылке
https://pastebin.com/0qsfuFmg