Size: a a a

2020 March 07

IP

Ivan Ponomarev in pro.kafka
Aleksey Smirnov
Вот мы у себя решили в итоге рядом держать Redis, на который будут смотреть и producer и consumer.
Producer перед добавлением сообщения в топик сходит в ключ Redis типа Set, проверит через SISMEMBER нет ли такого ключа уже, если нет - положит в топик и добавит в SET в Redis.
А consumer после успешной обработки сообщения и ACK - удалит ключ из SET в Redis.
Накладные расходы лишние, но что поделать, зато очень просто в реплизации.
Dual write короче) причём в виде как вы описали, если консьюмер справится с обработкой быстрее чем продюсер запихнёт значение в set, мы получим навек застрявшую в set запись (наверняка можно придумать и иные аномальные сценарии)

Но я понимаю, в вашем случае это может не быть проблемой. Redis так хорош, так прост, так соблазнителен) я тоже затыкаю им всякие дыры, но с гаденьким ощущением что делаю что-то не то)))
источник

AS

Aleksey Smirnov in pro.kafka
Ivan Ponomarev
Dual write короче) причём в виде как вы описали, если консьюмер справится с обработкой быстрее чем продюсер запихнёт значение в set, мы получим навек застрявшую в set запись (наверняка можно придумать и иные аномальные сценарии)

Но я понимаю, в вашем случае это может не быть проблемой. Redis так хорош, так прост, так соблазнителен) я тоже затыкаю им всякие дыры, но с гаденьким ощущением что делаю что-то не то)))
это да, про ощущение 😁
да вроде не должно застрять на первый взгляд, если мы сначала до кафки с ним работаем, а потом после кафки. только если сообщение потеряем - но это уже нештатная ситуация, реплики ведь будут.
источник

AS

Aleksey Smirnov in pro.kafka
да, могут быть нюансы конечно..
в редис создали, в топик не смогли.. надо этот вариант сбоя закрыть будет.
но экспертизы нет такой пока в команде, чтобы сделать по другому
источник

S🕶

Sander 🕶 in pro.kafka
Всем привет
источник

S🕶

Sander 🕶 in pro.kafka
нужна помощь по Kafka, уже весь день мучаюсь с этим,
источник

S🕶

Sander 🕶 in pro.kafka
значит ситуация следующая, у меня сообщения consumer принимает - только вот с такими параметрами в аннотации:
@KafkaListener(topics = "${producer.topic.create-ticket}", containerFactory = "${container.factory.create-ticket}")
источник

S🕶

Sander 🕶 in pro.kafka
если посмотреть на весь код, то как containerFactory, будет работать с разными типами: CreateTicket, CreateTicket2, CreateTicket3?
только с одним методом работает, а на каждый @KafkaHandler нельзя добавить containerFactory.

@KafkaListener(topics = "${producer.topic.create-ticket}", containerFactory = "${container.factory.create-ticket}")
@Service
public class TicketServiceCommandHandler {

@KafkaHandler
private void createTicket(CreateTicket command) {
             ....
}

      @KafkaHandler
private void createTicket(CreateTicketTest2 command) {
             ....
}

        @KafkaHandler
private void createTicket(CreateTicketTest3 command) {
             ....
}
}
источник

S🕶

Sander 🕶 in pro.kafka
🤔
источник

S🕶

Sander 🕶 in pro.kafka
собственно где проблема? что можно сделать? можно ли как-то сделать чтоб он сам понимал какой ему метод вызывать без containerFactory?
источник

S🕶

Sander 🕶 in pro.kafka
моя проблема всем понятна, сижу над проблемой больше суток -не могу заставить работать, требуется помощь(
источник

VG

Vik Gamov in pro.kafka
Ivan Ponomarev
@gamussa , не знаешь, Мэттиас будет на кафка саммите в Лондоне?
источник

OZ

Oleg Zinch in pro.kafka
незнаю как Метиас, но Влад Яма точно будет
источник

VG

Vik Gamov in pro.kafka
Oleg Zinch
незнаю как Метиас, но Влад Яма точно будет
Ну...
источник

VG

Vik Gamov in pro.kafka
Таких людей не знать
источник

VG

Vik Gamov in pro.kafka
Oleg Zinch
незнаю как Метиас, но Влад Яма точно будет
Who is Vlad кстати
источник

S🕶

Sander 🕶 in pro.kafka
прошу прощение, я писал вопрос по поводу Spring + Kafka, над проблемой уже долго сижу,
есть ли надежда что мне помогут?
источник

S🕶

Sander 🕶 in pro.kafka
Sander 🕶
если посмотреть на весь код, то как containerFactory, будет работать с разными типами: CreateTicket, CreateTicket2, CreateTicket3?
только с одним методом работает, а на каждый @KafkaHandler нельзя добавить containerFactory.

@KafkaListener(topics = "${producer.topic.create-ticket}", containerFactory = "${container.factory.create-ticket}")
@Service
public class TicketServiceCommandHandler {

@KafkaHandler
private void createTicket(CreateTicket command) {
             ....
}

      @KafkaHandler
private void createTicket(CreateTicketTest2 command) {
             ....
}

        @KafkaHandler
private void createTicket(CreateTicketTest3 command) {
             ....
}
}
хотя бы скажите если не понятно о чем я.
источник

IP

Ivan Ponomarev in pro.kafka
Какой serde используешь?
источник

S🕶

Sander 🕶 in pro.kafka
если убрать containerFactory, то выдает след ошибку.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition create-ticket-0 at offset 60. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.example.application.domain.command.CreateTicketCommandTest' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
источник

S🕶

Sander 🕶 in pro.kafka
Ivan Ponomarev
Какой serde используешь?
Spring Framework
источник