Size: a a a

2020 October 12

VG

Vik Gamov in pro.kafka
Nikolay
Кто знает . Брокеры делают long poling или short polling ?
В смысле?
источник

N

Nikolay in pro.kafka
Vik Gamov
В смысле?
брокер читает с лидера  в тот момент, когда на лидере нет новых данных для брокера. если это просто polling, то ответ вернется сразу - данных нет, а если long polling то ответ вернется, когда появятся данные ( или будет превышен интервал ожидания)
источник

N

Nikolay in pro.kafka
так понимаю, что с брокера просто pooling.
источник

N

Nikolay in pro.kafka
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
     val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData]
     partitionMap.foreach { case (partition, state) =>
       if (state.isReadyForFetch) {
         val replicaState = replicaPartitionState(partition)
         fetchData.put(partition, new FetchRequest.PartitionData(state.fetchOffset, replicaState.logStartOffset,
           1024 * 1024, Optional.of[Integer](state.currentLeaderEpoch)))
       }
     }
     val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
     ResultWithPartitions(Some(fetchRequest), Set.empty)
   }
источник

N

Nikolay in pro.kafka
вот тут val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
источник

N

Nikolay in pro.kafka
не. ведь он ждет до 1го байта. так ведь? значит long polling?
источник

N

Nikolay in pro.kafka
хотя нет . вот нашел вроде  // respond immediately if 1) fetch request does not want to wait
   //                        2) fetch request does not require any data
   //                        3) has enough data to respond
   //                        4) some error happens while reading data
   //                        5) any of the requested partitions need HW update
   if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || anyPartitionsNeedHwUpdate)
источник

N

Nikolay in pro.kafka
тогда что выходит, что реплика постоянно бомбит лидера запросами без всяких ожиданий?
источник

N

Nikolay in pro.kafka
опа. не туда смотрел. тот код с нулем был из теста. если кто читал сообщения, то пардон. там или 1 байт или replica.fetch.wait.max.ms и вот документ  https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal   "A fetch request with min.bytes=1 won't be answered until there is at least one new byte of data for the consumer to consume. This allows a "long poll" so that the consumer need not busy wait checking for new data to arrive"
источник

S

Slava in pro.kafka
Nikolay
опа. не туда смотрел. тот код с нулем был из теста. если кто читал сообщения, то пардон. там или 1 байт или replica.fetch.wait.max.ms и вот документ  https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal   "A fetch request with min.bytes=1 won't be answered until there is at least one new byte of data for the consumer to consume. This allows a "long poll" so that the consumer need not busy wait checking for new data to arrive"
в телеграме можно без пардонов и мерси удалять сообщения, прогресс уже там
источник
2020 October 13

РМ

Руслан Маркелов... in pro.kafka
Oleg Nyrkov
Сижу пишу велосипед, может кто написал уже? Вычитываем из кафки сообщения и раскладываем по локальным очередям, сейчас коммит делаем после того как положили сообщения в локальную очередь. Вот решили что можем потерять сообщения если упадем, и делать более умный управляемый коммит.  Может знает уже готовое, кейс вроде стандартный, должно же где то быть уже реализованное
Мы параллельно обрабатываем сообщения вычитанные из партиций кафки в очередь. Обработка так гораздо быстрее чем последовательно. Но при этом для коммита надо складывать обработанные оффсеты в непрерывную последовательность и потом коммитить иначе при восстановлении в случае сбоя не обработанные сообщения могут быть потеряны. Также приходится очень тщательно обрабатывать ошибки и следить за тем что код занимающийся параллельной обработкой не должен зависать. Если одно сообщение зависнет вас может порвать по памяти при накоплении непрерывной последовательности. Готового решения не встречал. Но в доке кафки написано что если захочется параллельной обработки партиции то надо делать примерно так как я описал.
источник

ON

Oleg Nyrkov in pro.kafka
Руслан Маркелов
Мы параллельно обрабатываем сообщения вычитанные из партиций кафки в очередь. Обработка так гораздо быстрее чем последовательно. Но при этом для коммита надо складывать обработанные оффсеты в непрерывную последовательность и потом коммитить иначе при восстановлении в случае сбоя не обработанные сообщения могут быть потеряны. Также приходится очень тщательно обрабатывать ошибки и следить за тем что код занимающийся параллельной обработкой не должен зависать. Если одно сообщение зависнет вас может порвать по памяти при накоплении непрерывной последовательности. Готового решения не встречал. Но в доке кафки написано что если захочется параллельной обработки партиции то надо делать примерно так как я описал.
хорошо, спасибо, тогда наверно выложу свою поделку, может кто посмотрит скажет где не прав
источник

O

Oleg in pro.kafka
Dmitri Mumber
Всем привет!
Кто-нибудь сталкивался с проблемой установки consumer.max.poll.records и consumer.max.poll.interval.ms для (custom) Kafka Connect коннекторов? Коннектор игнорирует любые значения :( Может кто подскажет альтернативы?
Спасибо!
в коннекторы параметры внутренних консюмеров не прокинуты, к сожалению

PR мой год уже висит, @gamussa доколе? :)
источник

O

Oleg in pro.kafka
Руслан Маркелов
Мы параллельно обрабатываем сообщения вычитанные из партиций кафки в очередь. Обработка так гораздо быстрее чем последовательно. Но при этом для коммита надо складывать обработанные оффсеты в непрерывную последовательность и потом коммитить иначе при восстановлении в случае сбоя не обработанные сообщения могут быть потеряны. Также приходится очень тщательно обрабатывать ошибки и следить за тем что код занимающийся параллельной обработкой не должен зависать. Если одно сообщение зависнет вас может порвать по памяти при накоплении непрерывной последовательности. Готового решения не встречал. Но в доке кафки написано что если захочется параллельной обработки партиции то надо делать примерно так как я описал.
а backpressure не делаете, чтобы зависание не роняло приложение?
источник

AI

Alexander Iskuskov in pro.kafka
Oleg
в коннекторы параметры внутренних консюмеров не прокинуты, к сожалению

PR мой год уже висит, @gamussa доколе? :)
Можно ведь переопределить настройки клиентов https://docs.confluent.io/current/connect/references/allconfigs.html#override-the-worker-configuration
или это про другое?
источник

O

Oleg in pro.kafka
Alexander Iskuskov
Можно ведь переопределить настройки клиентов https://docs.confluent.io/current/connect/references/allconfigs.html#override-the-worker-configuration
или это про другое?
наверно уже вынесли, раньше не было
источник

РМ

Руслан Маркелов... in pro.kafka
Oleg
а backpressure не делаете, чтобы зависание не роняло приложение?
у нас очереди на каждую партицию отдельные и ограниченные
если очередь заполнена перестаем читать партицию пока не разгребется очередь

зависание кода обработчика мы не можем контролировать (там пользовательский код)
поэтому у нас есть таймаут и если в течении этого таймаута код обработчика не возвращает управление то мы просто добавляем ооффсет этого сообщения к последовательности на коммит

тут выбор либо блокировать и выходить изз групппы подписки либо продолжать с подвисшим потоком, мы выбрали продолжать
источник
2020 October 14

ЮН

Юлия Николаева... in pro.kafka
Всем привет)
Кто-то сталкивался с проблемой, что Оффсет не проставляется?
источник

Y

Yuriy in pro.kafka
Юлия Николаева
Всем привет)
Кто-то сталкивался с проблемой, что Оффсет не проставляется?
Подробнее)
источник

V

VTaliKK in pro.kafka
Парни, может вопрос нуббский
источник