Привет. Подскажите, можно ли реализовать работу с топиком кафки так, чтобы в нём отбрасывались сообщения по ключу, если в топике уже есть сообщение с таким же ключом?
Привет!
можно вот так реализовать:
private void uniqueByKey(KStream<String, String> stream) {
final String markForFirst = "!!!#";
stream
.peek((key, value) -> log.debug("INPUT: {} -> {}", key, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.reduce((first, second) -> {
log.debug("reduce: first = {}, second = {}", first, second);
return first.startsWith(markForFirst)
? first
: markForFirst + first;
})
.toStream()
.filter((key, value) -> !value.startsWith(markForFirst))
.peek((key, value) -> log.debug("!!! OUTPUT: {} -> {}", key, value));
}