Size: a a a

2020 March 05

NR

Nikita Ryanov in pro.kafka
Такое можно сделать с помощью кафка стримов - агрегировать данные из топика в ktable, отбрасывая все новые значения.
На уровне брокера такого функционала пока нет
источник

AS

Aleksey Smirnov in pro.kafka
Nikita Ryanov
Такое можно сделать с помощью кафка стримов - агрегировать данные из топика в ktable, отбрасывая все новые значения.
На уровне брокера такого функционала пока нет
Спасибо, буду копать в эту сторону.
источник

NR

Nikita Ryanov in pro.kafka
А насколько в целом такой функционал был бы полезен кафке? Подобный кастомный retention policy, на первый взгляд, не сложно и довольно-таки "дешево" реализовать на основе существующего compacted и в итоге получить что-то вроде reverseCompacted, который оставлял бы самые старые записи по ключу, отбрасывая все новые.

Хотя в топике все равно будут временно видны новые записи =/
источник

AS

Aleksey Smirnov in pro.kafka
Вот и я когда описывал, первый термин что мне пришел в голову - reverse log compaction.
У меня такой кейс - в топик набираются события от пользователей, на которые нужно вызвать пересчет рекомендаций (нейронкой). Причем скорость обработки может быть небыстрой, а вот событий от пользователей может насыпать много, причем от одного и тогоже. И нет никакого смысла ставить этого пользователя в очередь на пересчет, если он там уже есть и причем поближе к моменту обработки. Отсюда - требование к уникальности ключей в топике, где старые записи важнее новых.
Я еще вообще не уверен что этот сценарий на кафку ложится, но начать решил с неё, как с нашего дефолтного брокера.
источник

AK

Alexey Konyaev in pro.kafka
Aleksey Smirnov
Привет. Подскажите, можно ли реализовать работу с топиком кафки так, чтобы в нём отбрасывались сообщения по ключу, если в топике уже есть сообщение с таким же ключом?
Привет!
можно вот так реализовать:
private void uniqueByKey(KStream<String, String> stream) {
       final String markForFirst = "!!!#";

       stream
               .peek((key, value) -> log.debug("INPUT: {} -> {}", key, value))
               .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
               .reduce((first, second) -> {
                   log.debug("reduce: first = {}, second = {}", first, second);
                   return first.startsWith(markForFirst)
                           ? first
                           : markForFirst + first;
               })
               .toStream()
               .filter((key, value) -> !value.startsWith(markForFirst))
               .peek((key, value) -> log.debug("!!! OUTPUT: {} -> {}", key, value));
   }
источник

AK

Alexey Konyaev in pro.kafka
только тут я не заморачивался с тем, как пометить значение признаком "уже посчитали один раз" и просто добавил строковую метку.
источник

IR

Ivan Rasikhin in pro.kafka
мб в reduce просто возвращать first?
источник

IR

Ivan Rasikhin in pro.kafka
а нужно еще фильтровать
источник

AK

Alexey Konyaev in pro.kafka
можно, но тогда в OUTPUT этот first будет каждый раз вываливаться при поступлении нового события
источник

AK

Alexey Konyaev in pro.kafka
еще немного улучшил вариант выше:)
суть в том, что reduce() самое первое значение всегда "пропускает" как есть:

static final String STOP_VALUE = UUID.randomUUID().toString();
private KStream<String, String> firstByKey(KStream<String, String> stream) {
   return stream
           .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
           .reduce((first, second) -> STOP_VALUE)
           .toStream()
           .filter((key, value) -> !STOP_VALUE.equals(value))
           .peek((key, value) -> log.debug("OUTPUT: {} -> {}", key, value));
}
источник
2020 March 06

ЮХ

Юра Ходырев in pro.kafka
Привет.
Вопрос от нуба.
Кто как борется с приведением типов при использовании JDBC Source connector-ов?

Синопсис: У меня есть БД Oracle 12.2. Развернул kafka-connect от конфлюента, создал source сonnector с использованием драйвера OJDBC8. Тащу табличку, в которой поля с типом NUMBER(38,0) и TIMESTAMP(6) WITH TIME ZONE.

В кафке получаю трэш.
источник

ЮХ

Юра Ходырев in pro.kafka
Вычитал, что есть schema registry, но пока что не понял. Можно ли с помощью регистри один раз описать универсальное приведение типов и жить припеваючи?
источник

ЮХ

Юра Ходырев in pro.kafka
А все пардоньте, нашел ответ: numeric.mapping.
источник

IR

Ivan Rasikhin in pro.kafka
информации ради, мб кто не видел
https://github.com/streamthoughts/azkarra-streams
крутая обвязка вокруг kafka-streams добавляющая небольшой UI и хелс чеки
мб накидаете автору звездочек
источник
2020 March 07

VG

Vik Gamov in pro.kafka
Ivan Rasikhin
информации ради, мб кто не видел
https://github.com/streamthoughts/azkarra-streams
крутая обвязка вокруг kafka-streams добавляющая небольшой UI и хелс чеки
мб накидаете автору звездочек
Крутая да, чел будет на Кафка саммите в Лондоне
источник

IP

Ivan Ponomarev in pro.kafka
ого, это интересно
но, похоже, оно не для спринга

мы тут у себя пытаемся свелосипедить spring boot starter, в который выносим такие общие вещи, как обработку ошибок и метрики . Интересно будет посмотреть, что в Azkarra делают
источник

IP

Ivan Ponomarev in pro.kafka
Alexey Konyaev
еще немного улучшил вариант выше:)
суть в том, что reduce() самое первое значение всегда "пропускает" как есть:

static final String STOP_VALUE = UUID.randomUUID().toString();
private KStream<String, String> firstByKey(KStream<String, String> stream) {
   return stream
           .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
           .reduce((first, second) -> STOP_VALUE)
           .toStream()
           .filter((key, value) -> !STOP_VALUE.equals(value))
           .peek((key, value) -> log.debug("OUTPUT: {} -> {}", key, value));
}
В общем, как мы тут выяснили, этот вариант не работает, будьте осторожны!

Дело в кэшировании,   из-за которого условие .filter((key, value) -> !STOP_VALUE.equals(value)) может быть "съедено" и никогда не выполнено (просто обработчик .filter будет видеть уже только одно STOP_VALUE) Кафка-стримы -- это совсем  не  java-стримы, хоть и прикидываются оными
источник

IP

Ivan Ponomarev in pro.kafka
Сама исходная задача похожа на реализацию distinct в  Java-стримах.
Из-за "бесконечности" KStream, в духе Kafka Streams API было бы сделать Windowed Distinct.  
Нам такое тоже нужно, как дойдут руки, напишу KIP

А пока мы пытаемся сделать ValueTransformer с KVStore для решения этой задачи. Судя по первым экспериментам, эта идея работает. Но иметь Windowed Distinct в Streams API было бы правильнее
источник

IP

Ivan Ponomarev in pro.kafka
@gamussa , не знаешь, Мэттиас будет на кафка саммите в Лондоне?
источник

AS

Aleksey Smirnov in pro.kafka
Ivan Ponomarev
Сама исходная задача похожа на реализацию distinct в  Java-стримах.
Из-за "бесконечности" KStream, в духе Kafka Streams API было бы сделать Windowed Distinct.  
Нам такое тоже нужно, как дойдут руки, напишу KIP

А пока мы пытаемся сделать ValueTransformer с KVStore для решения этой задачи. Судя по первым экспериментам, эта идея работает. Но иметь Windowed Distinct в Streams API было бы правильнее
Вот мы у себя решили в итоге рядом держать Redis, на который будут смотреть и producer и consumer.
Producer перед добавлением сообщения в топик сходит в ключ Redis типа Set, проверит через SISMEMBER нет ли такого ключа уже, если нет - положит в топик и добавит в SET в Redis.
А consumer после успешной обработки сообщения и ACK - удалит ключ из SET в Redis.
Накладные расходы лишние, но что поделать, зато очень просто в реплизации.
источник