Size: a a a

2020 May 06

DT

Denis Tarasov in pro.kafka
в кассандре нет pub\sub
источник

AD

Alex Dev in pro.kafka
Ринат Харисов
Всем привет! Поделитесь опытом, пожалуйста. У нас на проекте используется Кафка, клиент на Java.
Есть задача отправлять эвенты клиентам по Server Sent Event. После подключения клиент должен получить все эвенты пока он был оффлайн, а затем продолжать получать сообщения, созданные после подключения.
Хочу реализовать следующим образом. Продюсеры, которые хотят уведомить клиента, отправляют сообщения в специальный топик. Когда клиент подключается, создаем нового Кафка консьюмера на основе id пользователя(каждый пользователь это отдельная консьюмер-группа), подписываемся на все партишены в топике. Затем запрашиваем батч сообщений из Кафки, отфильтровываем сообщения для клиента, отправляем их по SSE, затем фиксируем оффсет в Кафка и так до бесконечнеости. Если клиент отключается, то просто отписываемся от всех партишенов. Простой тест показал, что при 2 партишенах прочитать 1500 сообщений пользователя из 3 млн сообщений в топике одним консьюмером занимает примерно 1,5 - 2 мин, что не проходит по требованиям. Поэтому хочу увеличить количество портишенов(например, до 1024) У нас продюсеры  роутят сообщения по ключу. Консьюмер будет делать предположение в каком партишене искать сообщения на основе ключа и подписываться только на этот партишен.  

Есть такие вопросы:
1. Насколько плохо иметь большое количество консьюмеров с точки зрения Кафки. Допустим, их будет параллельно более 1000 на этот топик?
2. Вывезет ли Кафка такое количество партишенов и консьюмеров?
3. Есть в описанной схеме что-то что я не учел или можно сделать лучше?
4. В схеме с фиксированным количеством партишенов есть проблема, что нельзя будет их увеличить потом так, чтобы не потерять сообщения пользователя. Как бы это можно было бы решить?
мне кажется тут рисунок нужен
источник

AD

Alex Dev in pro.kafka
Denis Tarasov
я бы рассматривал кафку как промежуточный буфер и потоково сортировал нотификации, которые бы раскладывал в редис по ключам (user:natofication.type)
если нужно будет вынуть потом только свежие , то придется использовать scan например по lastupdate < date это не здорово
источник

DT

Denis Tarasov in pro.kafka
если выбрать правильную структуру sorted set, к примеру, то поиск будет O(1), а дальше работа с массивом на сороне сервиса
источник

DT

Denis Tarasov in pro.kafka
редис - это не только про ки велью
источник

MD

Maxim Davydov in pro.kafka
Maxim Davydov
всем привет, хотел узнать, есть ли какая-то возможность обработать ошибку о том, что не удалось подключиться к кафке, когда она пишет Connection to node could not be established. Broker may not be available.? Судя по сорцам, там нет никаого эксепшна и никак это обработать нельзя, в гугле тоже ничего не нашел, кроме нескольких сообщений о том, что так и задумано
вопрос все еще актуален
источник

OK

Oleg Kovalov in pro.kafka
Oleg Kovalov
sasl plain
запускаю такой вот KafkaContainer (от testcontainers)
kafkaContainer.withEnv("KAFKA_LISTENERS", "SASL_PLAINTEXT://:9092");
kafkaContainer.withEnv("KAFKA_ADVERTISED_LISTENERS", "SASL_PLAINTEXT://localhost:9092");
kafkaContainer.withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");
kafkaContainer.withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=./kafka_server_jaas.conf");
kafkaContainer.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "SASL_PLAINTEXT");
kafkaContainer.withEnv("KAFKA_SECURITY_PROTOCOL", "SASL_PLAINTEXT")
kafkaContainer.withEnv("KAFKA_SASL_MECHANISMS", "PLAIN")
kafkaContainer.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
kafkaContainer.withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAINTEXT") // !!!!!


получаю вот такое:
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAINTEXT, enabled mechanisms are []

оно ведь включено...
источник

OK

Oleg Kovalov in pro.kafka
Oleg Kovalov
запускаю такой вот KafkaContainer (от testcontainers)
kafkaContainer.withEnv("KAFKA_LISTENERS", "SASL_PLAINTEXT://:9092");
kafkaContainer.withEnv("KAFKA_ADVERTISED_LISTENERS", "SASL_PLAINTEXT://localhost:9092");
kafkaContainer.withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181");
kafkaContainer.withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=./kafka_server_jaas.conf");
kafkaContainer.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "SASL_PLAINTEXT");
kafkaContainer.withEnv("KAFKA_SECURITY_PROTOCOL", "SASL_PLAINTEXT")
kafkaContainer.withEnv("KAFKA_SASL_MECHANISMS", "PLAIN")
kafkaContainer.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
kafkaContainer.withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAINTEXT") // !!!!!


получаю вот такое:
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAINTEXT, enabled mechanisms are []

оно ведь включено...
источник

OK

Oleg Kovalov in pro.kafka
забавно то, что конфиг 1в1 с docker-compose, в котором это все ходит
источник

ЮХ

Юра Ходырев... in pro.kafka
Oleg Kovalov
забавно то, что конфиг 1в1 с docker-compose, в котором это все ходит
У меня глупый вопрос. А так и должно быть что у тебя часть переменных "PLAIN"?
источник

ЮХ

Юра Ходырев... in pro.kafka
Ошибка говорит, что не удалось, видимо, разобрать твои переменные
источник

N

N in pro.kafka
Гайз подскажите плиз в Spring Cloud Streams пацики решают проблему ребалансига групп ??
К чему вопрос - возникла странная ситуация, по какой то из причин вычли с кафки повторно один и тот же месседж дважды и отхватили с базы  ERROR: duplicate key
Может ли эт быть связано с тем что был вызван poll и вычтено N меседжей первый из них сохранился в базу, но произошел ребалансинг и офсет не закомитился и соответственно новый консюмер перечитал этот месседж ??
источник

OK

Oleg Kovalov in pro.kafka
Юра Ходырев
Ошибка говорит, что не удалось, видимо, разобрать твои переменные
меня пустой список (enabled mechanisms are []) добивает
источник

OK

Oleg Kovalov in pro.kafka
выходит что да, оно не зачитало конфиг, но..в соседнем компоузе проходит
источник

VG

Vik Gamov in pro.kafka
Oleg Kovalov
забавно то, что конфиг 1в1 с docker-compose, в котором это все ходит
что за композ?
источник

OK

Oleg Kovalov in pro.kafka
Vik Gamov
что за композ?
источник

VG

Vik Gamov in pro.kafka
TC Не использует wurstmeister
TC используюет правильный образ от
Confluent
источник

OK

Oleg Kovalov in pro.kafka
Vik Gamov
TC Не использует wurstmeister
TC используюет правильный образ от
Confluent
хмм
источник

VG

Vik Gamov in pro.kafka
Такое да
источник

OK

Oleg Kovalov in pro.kafka
Vik Gamov
Такое да
да, пожалуй логично, что не сходится, я даж не подумал
источник