Size: a a a

2020 January 21

ὦan in pro.kafka
спасибо :)
источник
2020 January 22

D

Dan in pro.kafka
Всем привет, у меня к вам вопрос, мы пытаемся сделать backup ивентов из кафки для топика Х, для этого мы используем S3 sink с использованием avro схемы, т.е. наши данные включая avro схему сохраняются в S3 bucket. Но мы столкнулись со следующией проблемой:
1) io.confluent.connect.s3.S3SinkConnector, не умеет сохранять ключи от ивента, а он нам нужен, т.к. ключ содержит важную для нас информацию
2) Для того, чтобы решить проблемы №1, мы добавили плагин com.github.jcustenborder.kafka.connect.archive.Archive, который сохраняет еще и ключ ивента

И теперь у нас есть ключ ивента, но классический source io.confluent.connect.s3.source.S3SourceConnector не может эти сохранненый данные sinkом распарсить, выбрасывая следующую ошибку:
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class org.apache.avro.generic.GenericData$Record for field: "value"

Any idea?

Заранее благодарю
источник

D

Dan in pro.kafka
Добавление com.github.jcustenborder.kafka.connect.archive.Archive в Source не решает проблему
источник

RS

Roman Sharkov in pro.kafka
я правильнo понимаю, что кафка не предоставляет возможности  принимать сообщения только в том случае, если настоящая длина лога не соответствует той, которую мы ей передадим?

псевдо-код:

err = kafka.push(message, assumedLen)
if err == “length mismatch” {
 // assumedLen isn’t the same as the actual log length
}
источник
2020 January 23

AD

Aleksey Dobrunov in pro.kafka
привет.
прочитал описание нового релиза https://www.confluent.io/blog/introducing-confluent-platform-5-4 , вышедшего вчера. и не особо понял преимущества нового "Multi-Region Clusters" над растянутым. в описании делается упор, что растянутый так себе, это компромиссы.   но можно подумать Multi-Region Clusters не компромиссы ? выход из строя одного ДЦ приводит к потере данных, которые не среплицировались. и как их потом искать? в чём плюс то тогда?
p.sю в описании еще логическая ошибка ", and the broker sends an error indication back to the consumer."  в конце должен быть producer , а не consumer
источник

ὦan in pro.kafka
Я так понимаю что у кафки из коробки нет механизма, чтобы читать из очереди с определенным временным интевалом (eg. каждые 5/10 минут)
И приходится создавать специальные очереди и строить логику по типу, прочитал сообщение -> кинул поток в сон -> ожил и обработал сообщение?
источник

N

Nazar in pro.kafka
Это задачи консьюмера, а не кафки
источник

RS

Roman Sharkov in pro.kafka
Roman Sharkov
я правильнo понимаю, что кафка не предоставляет возможности  принимать сообщения только в том случае, если настоящая длина лога не соответствует той, которую мы ей передадим?

псевдо-код:

err = kafka.push(message, assumedLen)
if err == “length mismatch” {
 // assumedLen isn’t the same as the actual log length
}
я так понимаю что это невозможно? 🙃
источник

A

Anatoly Soldatov in pro.kafka
Aleksey Dobrunov
привет.
прочитал описание нового релиза https://www.confluent.io/blog/introducing-confluent-platform-5-4 , вышедшего вчера. и не особо понял преимущества нового "Multi-Region Clusters" над растянутым. в описании делается упор, что растянутый так себе, это компромиссы.   но можно подумать Multi-Region Clusters не компромиссы ? выход из строя одного ДЦ приводит к потере данных, которые не среплицировались. и как их потом искать? в чём плюс то тогда?
p.sю в описании еще логическая ошибка ", and the broker sends an error indication back to the consumer."  в конце должен быть producer , а не consumer
Растянутый работает отлично до того момента, пока у тебя сеть в пределах 10-20ms между ДЦ (цифры средние, для каждого своё значение latency приемлемо)
Чем выше latency становится, тем дольше идёт репликация, продюсеры дольше пишут (из-за acks), консьюмеры дольше читают
источник

A

Anatoly Soldatov in pro.kafka
Если у тебя один ДЦ в Америке, другой в Европе, latency скорее всего будет заметно влиять на работу кластера
источник

A

Anatoly Soldatov in pro.kafka
Поэтому ты делаешь несколько асинхронно реплицируемых кластеров
источник

NR

Nikita Ryanov in pro.kafka
ὦan
Я так понимаю что у кафки из коробки нет механизма, чтобы читать из очереди с определенным временным интевалом (eg. каждые 5/10 минут)
И приходится создавать специальные очереди и строить логику по типу, прочитал сообщение -> кинул поток в сон -> ожил и обработал сообщение?
Это больше похоже на батчевую обработку и, возможно, вам больше подойдет соответствующий движок для этого, чем стандартный консюмер. Или, как вариант, воспользоваться механизмом pause/resume
источник

AD

Aleksey Dobrunov in pro.kafka
Anatoly Soldatov
Если у тебя один ДЦ в Америке, другой в Европе, latency скорее всего будет заметно влиять на работу кластера
получается, что данное решение предназначено именно для растянутых на большие расстояния кластера.
источник

A

Anatoly Soldatov in pro.kafka
Aleksey Dobrunov
получается, что данное решение предназначено именно для растянутых на большие расстояния кластера.
В общем случае, да
Но у вас могут быть разные топологии
Может быть вы хотите одну центральную кафку и несколько сателитов с частичным набором данных, например. Через stretched это будет неудобно делать
источник

RI

Roman Izutov in pro.kafka
Nikita Ryanov
Это больше похоже на батчевую обработку и, возможно, вам больше подойдет соответствующий движок для этого, чем стандартный консюмер. Или, как вариант, воспользоваться механизмом pause/resume
Можно паузить ВСЕ контейнеры кафки и уходить спать, после сна резюмить их, опять же все. Однако , если брокер не получит от спящего консюмера keepalive сообщения, будет ребалансировка консьюмеров, и вот тут я не знаю, остануться ли контейнеры ждать или все разом заработают(как будто им всем сказали resume)
источник

NR

Nikita Ryanov in pro.kafka
Roman Izutov
Можно паузить ВСЕ контейнеры кафки и уходить спать, после сна резюмить их, опять же все. Однако , если брокер не получит от спящего консюмера keepalive сообщения, будет ребалансировка консьюмеров, и вот тут я не знаю, остануться ли контейнеры ждать или все разом заработают(как будто им всем сказали resume)
Не совсем понял мысль про контейнеры Кафки. Я имел в виду непосредственно консюмера в коде, которого можно поставить на паузу и пулить сообщения из брокера в холостую
источник

RI

Roman Izutov in pro.kafka
Nikita Ryanov
Не совсем понял мысль про контейнеры Кафки. Я имел в виду непосредственно консюмера в коде, которого можно поставить на паузу и пулить сообщения из брокера в холостую
Т.е. часть сообщений обрабатывать в холостую а часть нет? В чем смысл?
источник

MU

Max Underwood in pro.kafka
Есть неприятная история с таймаутами вида
ApplicationMsg=Error occurred at sending an event to the bus: error='Expiring 2 record(s) for events-52: 5171 ms has passed since batch creation plus linger time'
при этом:
происходит раз-два на две недели (порой вовсе пропадает на месяца)
в один момент времени проблема появляется только на одно продюсере. не было еще такого что бы в одно и то же время была проблема и на других продюсерах)
нагрузки на брокерах и на продюсерах в это время вовсе может не быть
таймаутится отправка на часть партиций одно брокера
ретраи тоже падают

Apache Kafka 0.10.2.1

3 брокера (игрались с разными настройками но в виду безрезультатоности вернулись к почти дефолтным настрокам)
проблема с самым активным топиком PartitionCount:64 ReplicationFactor:3
при это проблема возникает у продюсеров с размером месседжей выше среднего (средний менее 500 байт, максимальный 40k байт)
скорость записи в среднем 20 messages per second (максималка за прошлый месяц до 200)

max.in.flight.requests.per.connection=1
max.block.ms=3000
metadata.max.age.ms=1000
request.timeout.ms=5000
retries=10
buffer.memory=104857600
batch.size=65536
acks=1

по рейту сообщений - он разный, когда стартовали подбирали эти настройки под именно под работу продюсеров и пару лет жили без проблем с таймаутами

Может кто что сможет посоветовать?

P.S. как бы не хотелось списать на проблемы сети (ec2/vpc) не получается отловить проблему (продюсеров много и они в разных AWS аккаунтах). Да и происходит это редко. Кластер не под нагрузкой. Специфика задачи скорее большое количество консюмеров чем продюсеров. Топик __consumers_offsets вынесен на отдельные три ноды так как его использование значительно выше основного продуктового топика.
источник

NR

Nikita Ryanov in pro.kafka
Roman Izutov
Т.е. часть сообщений обрабатывать в холостую а часть нет? В чем смысл?
Нет, консьюмер вообще не будет получать Новые сообщения пока находится в режиме паузы
источник

NR

Nikita Ryanov in pro.kafka
Хотя вроде бы сейчас (с какой-то из версий) даже это не обязательно, так как поток, отсылающий хертбиты теперь независим и poll не является триггером его отправки
источник