Size: a a a

2021 January 08

N

Nikolay in pro.kafka
Sergey Sokolov
Если по одновременным подключениям, то не думаю что больше 200, но потом появятся IoT. Но у нас есть уже задача реализовать что то наподобие совместного редактирования документа, отображением изменений онлайн, поэтому стали думать в сторону EventSourcing.
не совсем понятно. Если основная задача - реализовать совместное редактирование , то можно взять CRDT?
источник

I

Ilgiz in pro.kafka
Sergey Sokolov
Добрый день, подскажите пожалуйста бест практис,
Задача в том чтобы реализовать взаимоедействие вебклиента с микросервисным бакендом, где микросервисы общаются  между собой с помощью Kafka.

вопрос в первую очередь, как реализовать синхронное взаимодействие в виде request/reply ?

Думали сделать такой вариант: веб клиент генерит corellationId и выполняет запрос к бэку например createBook,
далее логика такая
1. бакнед подписывается на топик кафка book-created  спомощью KafkaStreams делает filter по key=correlationId
2. бакнед отпарвляет команду CreateBookCmd в топик кафка book-create и среди прочего передает correlationId
3. далее какой-то консьюмер обрабатывает команду из топика book-create, создает Book и генерит BookCreatedEvent  в топик book-created
4. бакенд получате событие из топика book-created с нужным corellationId и затем отдает ответ на вебклиента.

в таком сценарии получается что на каждый запрос  вебклиента создается новый короткоживущий экземпляр KafkaStreams и вызывается start , stop
вопрос насколько ресурсоемко создание экземпляров KafkaStreams на каждый запрос от вебклиента?
как я понимаю такое решение аналогично созданию консьюмера для топика  book-created с последующим чтением всех событий и отбрасыванию с несоотвсвующими corellationId.
Соответсвенно вопрос  насколько ресурсоемко создание коньюмера на каждый запрос от вебклиента?

Может быть есть более правильные паттрены (что бы уйти от request/reply на фронте)?
Зачем создавать короткоживущие клиенты, почему нельзя сделать долгоживущие? То есть в api сервисе хранить список клиентов (открытых соединений) и когда из топика прилетает создание, то отвечать соответствующему клиенту. На первых порах можно чтобы каждый инстанс сервиса читал полный топик создания, в последствии можно реализовать кастомный партишонер. Но как писали выше - эту версию придется реализовать для каждого ЯП
источник

AK

Alexander Koval in pro.kafka
Yuri Zavyalov
Сделали подобное решение (request/reply через kafka) - работает очень надёжно, однако пришлось реализовать свою библиотеку, которая закрывает от разработчика все хитрости обмена (подписка на партиции нужных response-топиков, поиск и закрепление нужных партиций, разбивка и склейка фреймов данных)
Делаем похожее решение.
Сейчас заниманимаемся развитием. Добавляем тесты.
Как раз есть несколько прототипов с использованием https://www.testcontainers.org/ и апи как в https://livy.apache.org/docs/latest/rest-api.html.
https://github.com/kartzum/s-space/tree/main/r-streams (micronaut), https://github.com/kartzum/s-space/tree/main/s-streams (spring boot), для play пока не готово.
Остановились на таком решение, потому что клиенты пишутся легко и реализуется как простой автомат.
Был вариант с вебсокетами (см. https://github.com/kartzum/s-space/tree/main/fastapi-s/mq-a-proxy), т.к. первые клиенты должны были быть на питоне, поэтому прототип на питоне. Выглядит так, что использовать вебсокеты, гораздо удобнее на клиентах. Остались открытие вопросы: как решение будет работать за прокси? будет ли хорошо масштабироваться?

Если есть подобные решения с + и -, буду признателен за информацию.
источник

SS

Sergey Sokolov in pro.kafka
Phil Delgyado
А зачем вообще kstream?
фильтрация сообщений в топике делается оч просто с помощью стримов

KStream<String, String> source = builder.stream("book-created");
KStream<String, String> filtered = source.filter((key, val) -> key.equals(correlationId));
       
       filtered.foreach((key,value)->{
           System.out.println(value);
       });
источник

PD

Phil Delgyado in pro.kafka
Sergey Sokolov
фильтрация сообщений в топике делается оч просто с помощью стримов

KStream<String, String> source = builder.stream("book-created");
KStream<String, String> filtered = source.filter((key, val) -> key.equals(correlationId));
       
       filtered.foreach((key,value)->{
           System.out.println(value);
       });
А что при этом внутри происходит?
источник

VG

Vik Gamov in pro.kafka
Phil Delgyado
А что при этом внутри происходит?
Не происходит - результат в новый топик записывается (если надо)
источник

N

Nikolay in pro.kafka
но он же все на клиента тянет и на клиенте фильтрует, а не на брокере?
источник

PD

Phil Delgyado in pro.kafka
Vik Gamov
Не происходит - результат в новый топик записывается (если надо)
Это еще в кафке или уже в rocksdb?
источник

В

Вадим in pro.kafka
Ilgiz
Зачем создавать короткоживущие клиенты, почему нельзя сделать долгоживущие? То есть в api сервисе хранить список клиентов (открытых соединений) и когда из топика прилетает создание, то отвечать соответствующему клиенту. На первых порах можно чтобы каждый инстанс сервиса читал полный топик создания, в последствии можно реализовать кастомный партишонер. Но как писали выше - эту версию придется реализовать для каждого ЯП
упал сервис - пропало состояние
источник

SS

Sergey Sokolov in pro.kafka
Nikolay
не совсем понятно. Если основная задача - реализовать совместное редактирование , то можно взять CRDT?
Ну это не совсем основная задача,это один из юзкейзов, да CRDT тоже интересно, но пока не рассматривали (вот интересный фреймворк на эту тему https://cloudstate.io).
С одной стороны хочется не оверинжинирить, а с другой понятно, что без  работы с событиями сложно что то сделать.
источник

SS

Sergey Sokolov in pro.kafka
Ilgiz
Зачем создавать короткоживущие клиенты, почему нельзя сделать долгоживущие? То есть в api сервисе хранить список клиентов (открытых соединений) и когда из топика прилетает создание, то отвечать соответствующему клиенту. На первых порах можно чтобы каждый инстанс сервиса читал полный топик создания, в последствии можно реализовать кастомный партишонер. Но как писали выше - эту версию придется реализовать для каждого ЯП
Да, в эту сторону тоже надо будет посмотреть, действиетльно если есть мапа correlationId->session , то можно  читать полностью топик и в нужную сессию отправлять событие. Главное только держать в актуальном состоянии эту мапу.
источник
2021 January 09

VG

Vik Gamov in pro.kafka
Nikolay
но он же все на клиента тянет и на клиенте фильтрует, а не на брокере?
Все так.
источник

VG

Vik Gamov in pro.kafka
Phil Delgyado
Это еще в кафке или уже в rocksdb?
Java приложение. Для фильтрации rocksdb не нужен
источник

PD

Phil Delgyado in pro.kafka
Vik Gamov
Java приложение. Для фильтрации rocksdb не нужен
то есть это просто сахар над пропуском получаемых консьюмером сообщений? Или тут оффсет не сдвигается?
Т.е. как это реализовано, какие внутри гарантии и производительность?
источник

VG

Vik Gamov in pro.kafka
Phil Delgyado
то есть это просто сахар над пропуском получаемых консьюмером сообщений? Или тут оффсет не сдвигается?
Т.е. как это реализовано, какие внутри гарантии и производительность?
а как ещё? Кафка это immutable log, не ESB
источник

VG

Vik Gamov in pro.kafka
Phil Delgyado
то есть это просто сахар над пропуском получаемых консьюмером сообщений? Или тут оффсет не сдвигается?
Т.е. как это реализовано, какие внутри гарантии и производительность?
Оффсет будет свой так как это будет отдельная consumer group
источник

PD

Phil Delgyado in pro.kafka
Но на каждый KStream создается своя consumer group, да?
источник

VG

Vik Gamov in pro.kafka
Phil Delgyado
Но на каждый KStream создается своя consumer group, да?
Да
источник

PD

Phil Delgyado in pro.kafka
О, ответил быстрее, чем я спросил. Ага, просто сахар )
источник

VG

Vik Gamov in pro.kafka
Phil Delgyado
Но на каждый KStream создается своя consumer group, да?
У каждого Kafka streams приложения задаешь application.id который который транслируется в том числе в consumer group
источник