Size: a a a

2020 March 08

S🕶

Sander 🕶 in pro.kafka
producer не добавил конфиги нужные, как же я спать хочу ))
источник

S🕶

Sander 🕶 in pro.kafka
@ivan_ponomarev , @gamussa я на 5 минут отойду поспать, спасибо за помощь - сейчас постараюсь в своей проект перетащить,
источник

S🕶

Sander 🕶 in pro.kafka
@ivan_ponomarev а как быть с тем что, у меня есть второй проект - где крутится демон,
и он в раз секунду отправляет данные, получаю ту же ошибку - что class not trusted.

Если это микросервисы, то я никак не смогу взять из другой машины путь/тип класса и подсунуть в конфиг.
источник

S🕶

Sander 🕶 in pro.kafka
не понимаю почему так сложно сделано, могло бы просто прилететь сообщение и на лету попытаться смапить с объектом,
если нет кастомного serialize/deserialize и все, нет надо какие-то типы указывать.
источник

S🕶

Sander 🕶 in pro.kafka
@ivan_ponomarev @gamussa - а как такую проблему решить?
у меня два отдельных проекта, которые между собой посылают сообщения и я уже не могу в этом случаи в конфиг вписать путь до класса,
потому что он в другом проекте и крутится на другой машине.
источник

IP

Ivan Ponomarev in pro.kafka
Sander 🕶
@ivan_ponomarev @gamussa - а как такую проблему решить?
у меня два отдельных проекта, которые между собой посылают сообщения и я уже не могу в этом случаи в конфиг вписать путь до класса,
потому что он в другом проекте и крутится на другой машине.
я не очень понял вопрос
вопрос что, в том, что в другом проекте у тебя нет на ClassPath класса, который ты хочешь десериализовать? ну тогда никак)
источник

IP

Ivan Ponomarev in pro.kafka
Sander 🕶
@ivan_ponomarev а как быть с тем что, у меня есть второй проект - где крутится демон,
и он в раз секунду отправляет данные, получаю ту же ошибку - что class not trusted.

Если это микросервисы, то я никак не смогу взять из другой машины путь/тип класса и подсунуть в конфиг.
я не знаю, понял ли ты, что сообщение "class not trusted" не соответствовало реальной проблеме и вообще не имело отношения к проблеме.
источник

IP

Ivan Ponomarev in pro.kafka
проблема была не в trusted classes, а в том, что не сконфигурирован десериализатор
источник

S🕶

Sander 🕶 in pro.kafka
Ivan Ponomarev
я не знаю, понял ли ты, что сообщение "class not trusted" не соответствовало реальной проблеме и вообще не имело отношения к проблеме.
да это я понял, просто ошибка та же самая,

Проблема в том что: у меня второе приложение "демон", который крутится и push-ит события в kafka,
и есть второе приложение, которые тянет данные из kafka как consumer, но так как второе приложение не знает, какие там классы у первого приложения,
он кидает снова эту ошибку "class not trusted", конечно же я никак не могу добавить этот класс в конфиг, он в другом проекте лежит.
источник

S🕶

Sander 🕶 in pro.kafka
у меня как-то до этого работало,  public void listen(@Payload TicketDto ticketDto) { - он просто маппил филды json -> object и все,
сейчас он жалуется
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition accept-ticket-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.ticketsdaemon.domain.Ticket]; nested exception is java.lang.ClassNotFoundException: com.example.ticketsdaemon.domain.Ticket
источник

S🕶

Sander 🕶 in pro.kafka
😭
источник

S🕶

Sander 🕶 in pro.kafka
что-то опять с конфигом, со старым конфигом работает
источник

S🕶

Sander 🕶 in pro.kafka
все починил
источник

S🕶

Sander 🕶 in pro.kafka
там два конфига надо было
источник

S🕶

Sander 🕶 in pro.kafka
один для string типа
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
  consumerConfigs(),
  new StringDeserializer(),
  new StringDeserializer()
);
}
источник

S🕶

Sander 🕶 in pro.kafka
и один наш
источник

S🕶

Sander 🕶 in pro.kafka
😎
источник

S🕶

Sander 🕶 in pro.kafka
источник

IP

Ivan Ponomarev in pro.kafka
Alexey Konyaev
только тут я не заморачивался с тем, как пометить значение признаком "уже посчитали один раз" и просто добавил строковую метку.
В общем. Похоже, что полностью корректное решение этой задачи, с окнами и стрианием старых записей по retention, описано здесь: https://kafka-tutorials.confluent.io/finding-distinct-events/kstreams.html
Я сегодня изучал вопрос и проводил эксперименты. Не согласен, что там используется Transformer вместо ValueTransformer (это может привести к лишнему репартиционрированию дальше по конвейеру), но в целом  корректно

А вообще, в KStreamsAPI явно нужен метод distinct(TimeWindows, EventIdExctractor)
источник

AK

Alexey Konyaev in pro.kafka
Ага, там идея такая же, как наша, только ещё есть ретеншн за счёт window-store.
источник