Size: a a a

2021 January 09

PF

Peter Fitenko in pro.kafka
Всем привет. Подскажите пожалуйста, произошла такая ситуация. Тестировал работоспособность кафка сорс коннектора, 1 таблица в 1 топик. В базу заранее было записано 5 000 000 записей, запустил коннектор. Все крутится в кубере, во время переливки этих 5кк записей я несколько раз перезапускал kafka-connect-server под (раз 5). По итогу записей в топике появилось 7 500 000, а не 5 000 000. Понятно почему, ведь за все это время в топик офсетов было всего около 5 записей, Вопрос: можно ли как-то задать в настройках (jdbc коннектор стандартный), что бы offset писался в кафку по 1 записи, а не ограмными пачками, что бы избежать дубликатов, если кафка коннект упал, а оффсет не закомитился (ну или хотя бы минимизировать риски до 1 дубликата, а не 50%)?
источник

G

Gosha in pro.kafka
ReplyingKafkaTemplate не рассматривали? https://dzone.com/articles/synchronous-kafka-using-spring-request-reply-1
источник

SS

Sergey Sokolov in pro.kafka
Да, рассматривали. Но пока не используем спринг.
источник

G

Gosha in pro.kafka
Sergey Sokolov
Добрый день, подскажите пожалуйста бест практис,
Задача в том чтобы реализовать взаимоедействие вебклиента с микросервисным бакендом, где микросервисы общаются  между собой с помощью Kafka.

вопрос в первую очередь, как реализовать синхронное взаимодействие в виде request/reply ?

Думали сделать такой вариант: веб клиент генерит corellationId и выполняет запрос к бэку например createBook,
далее логика такая
1. бакнед подписывается на топик кафка book-created  спомощью KafkaStreams делает filter по key=correlationId
2. бакнед отпарвляет команду CreateBookCmd в топик кафка book-create и среди прочего передает correlationId
3. далее какой-то консьюмер обрабатывает команду из топика book-create, создает Book и генерит BookCreatedEvent  в топик book-created
4. бакенд получате событие из топика book-created с нужным corellationId и затем отдает ответ на вебклиента.

в таком сценарии получается что на каждый запрос  вебклиента создается новый короткоживущий экземпляр KafkaStreams и вызывается start , stop
вопрос насколько ресурсоемко создание экземпляров KafkaStreams на каждый запрос от вебклиента?
как я понимаю такое решение аналогично созданию консьюмера для топика  book-created с последующим чтением всех событий и отбрасыванию с несоотвсвующими corellationId.
Соответсвенно вопрос  насколько ресурсоемко создание коньюмера на каждый запрос от вебклиента?

Может быть есть более правильные паттрены (что бы уйти от request/reply на фронте)?
Вряд ли бестпрактис уже сформировались. И много факторов есть, которые влияют на выбор решения. Добавлю свои 5 копеек.
1. Зачем столько разных топиков? Можно реквест-респонсы складывать в один, тогда сообщения с одинаковым ключом будут в одном партишне и упорядочены по времени. Консьюмеры будут фильтровать по ключу/хидерам и брать то, что нужно им, это достаточно дешево. В блоге конфлюент есть несколько постов на эту тему.
2. В чем профит создавать короткоживущий стрим? Выглядит как усложнение в описанном кейсе. Если все происходит в рамках одного реквест-респонса с фронта, то можно обойтись консьюмером из топика/партишна и хоть в памяти сервиса держать мапу с correlationId реквестов, так как шарить ее между несколькими инстансами смысла особого нет. Рядом можно список держать с отсортированными по времени реквестами, который пригодится для чистки мапы. Вариант плох тем, что поток должен будет простаивать и проверять мапу в ожидании ответа из топика, чтобы потом его отдать на фронт. Либо нужно менять схему реквест-респонс, и, например, процесс запускать с фронта на одном эндпоинте, а потом результат спрашивать на другом, опрашивая в цикле сервис, либо вебсокеты, как уже писали выше. Но тогда уже нужно шарить данные между инстансами и использовать бд или редис какой-нибудь.
источник

AN

Artem Nenashev in pro.kafka
Gosha
Вряд ли бестпрактис уже сформировались. И много факторов есть, которые влияют на выбор решения. Добавлю свои 5 копеек.
1. Зачем столько разных топиков? Можно реквест-респонсы складывать в один, тогда сообщения с одинаковым ключом будут в одном партишне и упорядочены по времени. Консьюмеры будут фильтровать по ключу/хидерам и брать то, что нужно им, это достаточно дешево. В блоге конфлюент есть несколько постов на эту тему.
2. В чем профит создавать короткоживущий стрим? Выглядит как усложнение в описанном кейсе. Если все происходит в рамках одного реквест-респонса с фронта, то можно обойтись консьюмером из топика/партишна и хоть в памяти сервиса держать мапу с correlationId реквестов, так как шарить ее между несколькими инстансами смысла особого нет. Рядом можно список держать с отсортированными по времени реквестами, который пригодится для чистки мапы. Вариант плох тем, что поток должен будет простаивать и проверять мапу в ожидании ответа из топика, чтобы потом его отдать на фронт. Либо нужно менять схему реквест-респонс, и, например, процесс запускать с фронта на одном эндпоинте, а потом результат спрашивать на другом, опрашивая в цикле сервис, либо вебсокеты, как уже писали выше. Но тогда уже нужно шарить данные между инстансами и использовать бд или редис какой-нибудь.
> тогда сообщения с одинаковым ключом будут в одном партишне и упорядочены по времени

так здесь нет ни какого профита от сохранения порядка, а вот skip'апть сообщения не предназначенные для обработки consum'ером respon'зов может и не так дорого, но и можно этого избежать

> Вариант плох тем, что поток должен будет простаивать и проверять мапу в ожидании ответа из топика

ну а зачем так плохо делать, если можно сделать хорошо ) не надо ждать в потоке, пока придет response, можно вернуть его в пулл и поставить какой-нибудь асинхронный lock и уже реактивно его снять, когда придет response. другой вопрос, что там надо будет учитывать всякие тайматы http-соединения, но это уже особенности реализации
источник

G

Gosha in pro.kafka
Artem Nenashev
> тогда сообщения с одинаковым ключом будут в одном партишне и упорядочены по времени

так здесь нет ни какого профита от сохранения порядка, а вот skip'апть сообщения не предназначенные для обработки consum'ером respon'зов может и не так дорого, но и можно этого избежать

> Вариант плох тем, что поток должен будет простаивать и проверять мапу в ожидании ответа из топика

ну а зачем так плохо делать, если можно сделать хорошо ) не надо ждать в потоке, пока придет response, можно вернуть его в пулл и поставить какой-нибудь асинхронный lock и уже реактивно его снять, когда придет response. другой вопрос, что там надо будет учитывать всякие тайматы http-соединения, но это уже особенности реализации
1. Зависит от того, как дальше планируется данные использовать. Еще при отладке/поиске проблем без доп инструментов пару реквест-респонс удобнее смотреть в одном топике и на одном партишне. В экстремальных случаях с тысячами топиков такое кол-во может аффектить на перформанс. В общем, как лучше раскидать данные по топикам это отдельная большая тема.
2. Скорее всего, даже в каком-нибудь реактивном фреймворке можно посмотреть решение и сделать по аналогии. Но задачка нетривиальная для обычного крудописателя. И еще вопрос, стоит ли оно того.
источник
2021 January 11

M

Mr. Brain in pro.kafka
добрый день, подскажите пожалуйста, если закрыть кафку логином и паролем, можно ли будет со стороны косюмера узнать с какого логина пришло сообщение?
источник

DD

Dmitry Dobrynin in pro.kafka
Mr. Brain
добрый день, подскажите пожалуйста, если закрыть кафку логином и паролем, можно ли будет со стороны косюмера узнать с какого логина пришло сообщение?
Привет. В метаинформацию (хедеры) на продьюсере запихивай инфу о продьюсере. Штатных средств для этого вроде нет, да они и не нужны, ибо может вызвать секьюрити leak в некоторых случаях
источник

I

Ivan in pro.kafka
C sarama producer, кто знает как бороться с assertion failed: message out of sequence added to a batch ?
источник
2021 January 12

VG

Vik Gamov in pro.kafka
Переслано от Denis Pavlyuchenko
источник

H

HipJoy in pro.kafka
привет
кто-то сталкивался с "Coordinator selected invalid assignment protocol: null" ?
как бороться с этим? =)
источник

A

Artjom Kalita in pro.kafka
норкоманство какое-то
источник

SS

Sergey Sokolov in pro.kafka
Gosha
Вряд ли бестпрактис уже сформировались. И много факторов есть, которые влияют на выбор решения. Добавлю свои 5 копеек.
1. Зачем столько разных топиков? Можно реквест-респонсы складывать в один, тогда сообщения с одинаковым ключом будут в одном партишне и упорядочены по времени. Консьюмеры будут фильтровать по ключу/хидерам и брать то, что нужно им, это достаточно дешево. В блоге конфлюент есть несколько постов на эту тему.
2. В чем профит создавать короткоживущий стрим? Выглядит как усложнение в описанном кейсе. Если все происходит в рамках одного реквест-респонса с фронта, то можно обойтись консьюмером из топика/партишна и хоть в памяти сервиса держать мапу с correlationId реквестов, так как шарить ее между несколькими инстансами смысла особого нет. Рядом можно список держать с отсортированными по времени реквестами, который пригодится для чистки мапы. Вариант плох тем, что поток должен будет простаивать и проверять мапу в ожидании ответа из топика, чтобы потом его отдать на фронт. Либо нужно менять схему реквест-респонс, и, например, процесс запускать с фронта на одном эндпоинте, а потом результат спрашивать на другом, опрашивая в цикле сервис, либо вебсокеты, как уже писали выше. Но тогда уже нужно шарить данные между инстансами и использовать бд или редис какой-нибудь.
источник
2021 January 13

VC

Vladimir Chernyi in pro.kafka
Коллеги, может быть кто-то сможет подсказать. Есть сорс коннектор, заливающий строки из реляционной таблицы в топик с тремя партициями, работает ок. В коннекторе указан mode:bulk и sql запрос. Я выполняю PUT на порт 8083, коннектор начинает работать. Как понять, что все строки из запроса добавлены в топик, то есть коннектор закончил работу и ждет?
источник

IK

Ilya Kaznacheev in pro.kafka
Не совсем в тему кафки, но в тему синхронных/асинхронных взаимодействий с фронтом:
А кто-нибудь знает технологии кроме WebSocket и поллинга, чтобы асинхронно отвечать фронту в веб?
источник

EB

Eugene Bosiakov in pro.kafka
Ilya Kaznacheev
Не совсем в тему кафки, но в тему синхронных/асинхронных взаимодействий с фронтом:
А кто-нибудь знает технологии кроме WebSocket и поллинга, чтобы асинхронно отвечать фронту в веб?
браузеры только в WS и HTTP умеют
источник

MB

Muslim Beibytuly in pro.kafka
Ilya Kaznacheev
Не совсем в тему кафки, но в тему синхронных/асинхронных взаимодействий с фронтом:
А кто-нибудь знает технологии кроме WebSocket и поллинга, чтобы асинхронно отвечать фронту в веб?
Http/2 - grpc
источник

IK

Ilya Kaznacheev in pro.kafka
Muslim Beibytuly
Http/2 - grpc
Паттерн использования такой же будет - поллинг
источник

DP

Denis Pavlyuchenko in pro.kafka
Ilya Kaznacheev
Не совсем в тему кафки, но в тему синхронных/асинхронных взаимодействий с фронтом:
А кто-нибудь знает технологии кроме WebSocket и поллинга, чтобы асинхронно отвечать фронту в веб?
источник

ЮХ

Юра Ходырев... in pro.kafka
Vladimir Chernyi
Коллеги, может быть кто-то сможет подсказать. Есть сорс коннектор, заливающий строки из реляционной таблицы в топик с тремя партициями, работает ок. В коннекторе указан mode:bulk и sql запрос. Я выполняю PUT на порт 8083, коннектор начинает работать. Как понять, что все строки из запроса добавлены в топик, то есть коннектор закончил работу и ждет?
Думаю, что можно по статистике топика посмотреть поступают ли новые сообщения.
Либо смотреть логи/JMX статистику самого коннектора.
источник