Size: a a a

2020 November 03

IZ

Igor Zubchenok in pro.kafka
Ринат Харисов
а клиенты это что? как с ними работаете?
и нужно ли обеспечить получение клиентами данных если они в момент получения сообщения из кафки были не подключены?
клиенты - мобильные приложения
да, нужно, если связь рвется, то надо дослать оставшееся на реконнекте клиента
источник

РХ

Ринат Харисов... in pro.kafka
Igor Zubchenok
клиенты - мобильные приложения
да, нужно, если связь рвется, то надо дослать оставшееся на реконнекте клиента
Мы решили у себя подобную задачу так:

Клиент подключается по SSE, после подключения создается новый кафка консьюмер. Id юзера используется как консьюмер группа. Считываем все сообщения из всех топиков с сохраненного оффсета.
Если оффсет не сохранен, считаем с первого доступного оффсета. Отфильтровываем сообщения только для клиента. У нас не очень большая нагрузка, потому работает достаточно быстро. Получили сообщение - пульнули клиенту и закоммитили оффсет.
источник

VG

Vik Gamov in pro.kafka
Igor Zubchenok
клиенты - мобильные приложения
да, нужно, если связь рвется, то надо дослать оставшееся на реконнекте клиента
Хочешь напрямую к Кафке подключить?
источник

IZ

Igor Zubchenok in pro.kafka
Ринат Харисов
Мы решили у себя подобную задачу так:

Клиент подключается по SSE, после подключения создается новый кафка консьюмер. Id юзера используется как консьюмер группа. Считываем все сообщения из всех топиков с сохраненного оффсета.
Если оффсет не сохранен, считаем с первого доступного оффсета. Отфильтровываем сообщения только для клиента. У нас не очень большая нагрузка, потому работает достаточно быстро. Получили сообщение - пульнули клиенту и закоммитили оффсет.
У меня много клиентов, придется сканировать слишком много лишнего. 🙁
источник

IZ

Igor Zubchenok in pro.kafka
Vik Gamov
Хочешь напрямую к Кафке подключить?
Не собирался так, но можно. Я думаю тоже можно и напрямую, если такое сработает.
источник

DP

Denis Pavlyuchenko in pro.kafka
Igor Zubchenok
Как сделать стриминг данных юзерам через кафку (каждому юзеру свой стрим, юзеров много, сотни тысяч)? На каждого по топику не создашь... HELP!
(или это неадекватный вопрос - дайте знать)
одно из решений - поглядеть на пульсар. Там можно будет создать по топику на юзера, в отличие от кафки (но у пульсара свои проблемы есть тоже, да)
источник

РХ

Ринат Харисов... in pro.kafka
Igor Zubchenok
клиенты - мобильные приложения
да, нужно, если связь рвется, то надо дослать оставшееся на реконнекте клиента
насколько знаю такие вещи обычно решаются связкой бд + кафка. Если клиент активен в момент получения сообщения то сразу отдаем сообщение ему. Если нет, то пишем в бд. Когда подключается клиент, идем в бд за пропущенными сообщениями, а потом уже отдаем сообщения из кафки.  Реактивными операторами удобно мерджить стримы из бд и кафки.
источник

IZ

Igor Zubchenok in pro.kafka
Ринат Харисов
насколько знаю такие вещи обычно решаются связкой бд + кафка. Если клиент активен в момент получения сообщения то сразу отдаем сообщение ему. Если нет, то пишем в бд. Когда подключается клиент, идем в бд за пропущенными сообщениями, а потом уже отдаем сообщения из кафки.  Реактивными операторами удобно мерджить стримы из бд и кафки.
клиентов много и они могут подключаться к другим бекенд инстансам на реконнекте. но то, что клиентов много и инстансов много - не решает вопрос больших сканирований или большого кол-ва скипнутых сообщений
источник

VG

Vik Gamov in pro.kafka
Igor Zubchenok
Не собирался так, но можно. Я думаю тоже можно и напрямую, если такое сработает.
Не оч хорошая идея
источник

VG

Vik Gamov in pro.kafka
Denis Pavlyuchenko
одно из решений - поглядеть на пульсар. Там можно будет создать по топику на юзера, в отличие от кафки (но у пульсара свои проблемы есть тоже, да)
Нет
источник

DP

Denis Pavlyuchenko in pro.kafka
Vik Gamov
Нет
а, у пульсара уже нет проблем?
источник

VG

Vik Gamov in pro.kafka
Ринат Харисов
Мы решили у себя подобную задачу так:

Клиент подключается по SSE, после подключения создается новый кафка консьюмер. Id юзера используется как консьюмер группа. Считываем все сообщения из всех топиков с сохраненного оффсета.
Если оффсет не сохранен, считаем с первого доступного оффсета. Отфильтровываем сообщения только для клиента. У нас не очень большая нагрузка, потому работает достаточно быстро. Получили сообщение - пульнули клиенту и закоммитили оффсет.
Выглядит годнл
источник

IZ

Igor Zubchenok in pro.kafka
Vik Gamov
Выглядит годнл
Но там фильтрация сообщений только для клиента. Слишком много фильтраций если много инстансов и много подключенных клиентов к этим инстансам, которые запрашивают свои стримы.
источник

IZ

Igor Zubchenok in pro.kafka
Мне казалось это должна быть стандартная задача для кафки. Почему так сложно придумать скалируемый солюшн?
источник

IZ

Igor Zubchenok in pro.kafka
🛑 После обсуждений всяких пришел к выводу, что стриминг в сторону большого кол-ва юзеров - этот юзкейс не для кафки совсем.
источник

P

Pasha in pro.kafka
Привет
Делаю через processor api несколько нод процессоров
В одну добавляю глобальное хранилище, в следующую -  обычное
Но обычное почему-то возвращает нулл в init() методе

keyValueStore в следующем процессоре, где вся логика, не может найтись. Метод init выглядит так:
@Override
   public void init(ProcessorContext processorContext) {
       super.init(processorContext);
       this.keyValueStore = (KeyValueStore) context().getStateStore(stateStoreName);
   }
Но context().getStateStore(stateStoreName); возвращает null.

Топология такая
topology.addGlobalStore(storeKeyBuilder, transactionSourceNodeName, stringDeserializer, transactionDeserializer,
               inTopic, fakeProcessorNodeName, transactionSupplier)
               .addProcessor(transactionProcessorNodeName, () -> new TransactionProcessor(transactionsStateStore, bonus, bonusNum),
                       fakeProcessorNodeName)
               .addStateStore(storeBuilder, transactionProcessorNodeName)
               .addSink(transactionSinkNodeName, "analytics", stringSerializer, transactionPerformanceSerializer, transactionProcessorNodeName);

Кончено, я подумал, что виной всему наличие глобального хранилища. Но однозначных подтверждений в интернете не нашел. Да и какой смысл в промежуточном глобальном хранилище, если больше никаким нельзя будет пользоваться. Я смотрел внимательно свой код, но нигде не смог найти подвох.
Вопрос: неужели нельзя сделать два хранилища для двух нод? Или я что-то не так делаю
источник

P

Pasha in pro.kafka
Хранилища делаю так:
KeyValueBytesStoreSupplier storeKeySupplier = Stores.inMemoryKeyValueStore(transactionsKeyStateStore);
       StoreBuilder<KeyValueStore<String, Transaction>> storeKeyBuilder =
               Stores.keyValueStoreBuilder(storeKeySupplier, stringSerde, transactionSerde).withLoggingDisabled();

       KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(transactionsStateStore);
       StoreBuilder<KeyValueStore<String, TransactionPerformance>> storeBuilder =
               Stores.keyValueStoreBuilder(storeSupplier, stringSerde, transactionPerformanceSerde);
источник

ER

Evgeny Rachlenko in pro.kafka
Pasha
Хранилища делаю так:
KeyValueBytesStoreSupplier storeKeySupplier = Stores.inMemoryKeyValueStore(transactionsKeyStateStore);
       StoreBuilder<KeyValueStore<String, Transaction>> storeKeyBuilder =
               Stores.keyValueStoreBuilder(storeKeySupplier, stringSerde, transactionSerde).withLoggingDisabled();

       KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(transactionsStateStore);
       StoreBuilder<KeyValueStore<String, TransactionPerformance>> storeBuilder =
               Stores.keyValueStoreBuilder(storeSupplier, stringSerde, transactionPerformanceSerde);
Всем привет. У меня возник вопрос про красивый
shutdown для kafka. очень важно что бы она консистентно уходила из k8s. корректно заканчивая последние сессии. для этого наверное на producer все нужно обворачивать транзакциями ? а что же делать на стороне consumers ? или есть какой то другой способ?
источник

VG

Vik Gamov in pro.kafka
>  вопрос про красивый
источник

ER

Evgeny Rachlenko in pro.kafka
очень важно не поломать данные
источник