Size: a a a

2020 October 29

p

promzeus in pro.kafka
источник

lk

leonid khomenko in pro.kafka
promzeus
кто нибудь работал с divolte ?
ситуация такая при перезагрузке kafka, divolte перестает видеть ее и висит в цикле с сообщением
WARN  [NetworkClient]: [Producer clientId=divolte.collector] Connection to node 0 could not be established. Broker may not be available.
А вы проверяли просто доступность Кафки в такие моменты с поды? По днс/IP
источник

lk

leonid khomenko in pro.kafka
И что значит перезагрузка ? Перезагрузка брокера? Всего кластера? Пересоздание Кафка брокера у которого нет персистенс диска?
источник

p

promzeus in pro.kafka
leonid khomenko
А вы проверяли просто доступность Кафки в такие моменты с поды? По днс/IP
Да проверял в данный момент кафка не является кластером а поднята как один стейтфулсет инстанс, имеет PVC и PV, перезагрузка кафки подразумевает удаление пода kafka и его переподнятие автоматом. nslookup показывает что ип у кафки не меняется! в тоже время другие клиенты к примеру clickhouse переподключаются к кафке без проблем сами. наблюдается проблема пока только с Divolte
источник

ER

Evgeny Rachlenko in pro.kafka
@gamussa Огромное спасибо .
источник

lk

leonid khomenko in pro.kafka
promzeus
Да проверял в данный момент кафка не является кластером а поднята как один стейтфулсет инстанс, имеет PVC и PV, перезагрузка кафки подразумевает удаление пода kafka и его переподнятие автоматом. nslookup показывает что ип у кафки не меняется! в тоже время другие клиенты к примеру clickhouse переподключаются к кафке без проблем сами. наблюдается проблема пока только с Divolte
Use case описанный в примерах закрывается нативным kafka connect . Возможно, divolte требует или ему нужен рестарт соединения при разрывах для создания клианта
источник

A

Alik in pro.kafka
всем привет, никто не подскажет, как бороться с ошибкой, я делаю стриминг из кафки в hdfs, код очень прост:
lines = spark \
       .readStream \
       .format("kafka") \
       .option("kafka.bootstrap.servers", kafka_servers) \
       .option("subscribe", source) \
       .option('startingOffsets', startingOffset) \
       .option('failOnDataLoss', 'false') \
       .load()
   lines \
     .selectExpr("CAST(timestamp AS STRING)", "CAST(value AS STRING)") \
     .writeStream \
     .format("parquet") \
     .outputMode("append") \
     .option("path", checkpoint + "/streaming_data.parquet") \
     .option('checkpointLocation', checkpoint) \
     .start() \
     .awaitTermination()

вылетает ошибка:
pyspark.sql.utils.StreamingQueryException: u'Partition monitoring.answers-0\'s offset was changed from 42 to 32, some data may have been missed. \nSome data may have been lost because they are not available in Kafka any more; either the\n data was aged out by Kafka or the topic may have been deleted before all the data in the\n topic was processed. If you don\'t want your streaming query to fail on such cases, set the\n source option "failOnDataLoss" to "false".\n    \n=== Streaming Query ===\nIdentifier: [id = e8fc7c30-107b-44b7-8928-c68fb86d864a, runId = 03a598e4-e9a7-4f06-864d-95714df93c6c]\nCurrent Committed Offsets: {KafkaSource[Subscribe[monitoring.answers]]: {"monitoring.answers":{"0":42}}}\nCurrent Available Offsets: {KafkaSource[Subscribe[monitoring.answers]]: {"monitoring.answers":{"0":32}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [cast(timestamp#12 as string) AS timestamp#22, cast(value#8 as string) AS value#21]\n+- StreamingExecutionRelation KafkaSource[Subscribe[monitoring.answers]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'

Правильно ли я понимаю, что потеря данных происходит на загрузке данных в hdfs, но данные из кафки он уже считал и offset сдвинулся? Если так то как это решить
источник

ER

Evgeny Rachlenko in pro.kafka
Всем привет. Возник вопрос как правильно готовить kafka для kubernetes. Было бы здорово почитать  рекомендации. Все у меня работает, но уверенности пока нет. Особенно беспокоят микро-сервисы которые выкатываясь вместе с kafka в kubernetes иногда застревают при первом обращении к ней  и не отмерзают, пока не сделаешь им рестарт. То ли кафка не успевает подняться так же быстро как они , и им бы нужно было бы прописать dependencies или как то по другому научить ожидать. То ли самой кафке нужно прописать какой нибудь readiness check. Как  правильно решают такие конфликты ?
источник

В

Вячеслав in pro.kafka
Ну да, все верно. Readiness на кафку + dependencies на зависимые сервисы.
источник

В

Вячеслав in pro.kafka
Причём не "то ли/то ли" а и то и другое, иначе не будет работать. :)
источник

ЮХ

Юра Ходырев... in pro.kafka
Всем привет.
Люди добрые помогите

Есть cp-kafka-rest -> Запись в кафку с передачей ID Avro схемы

После этого данные вычитывает cp-kafka-connect/ JDBC Sink и он не подхватывает схему, по которой необходимо провести де-сереализацию. Поле, которое должно быть timestamp пытается вставить в СУБД как long

Вроде бы в конфигах создаваемого коннектора указал:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url= {{ SCHEMA_URL }}
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url= {{ SCHEMA_URL }}

Вот такая ошибка выходит в логах коннектора:
ERROR: column "create_time" is of type timestamp without time zone but expression is of type bigint
источник

ER

Evgeny Rachlenko in pro.kafka
Вячеслав
Ну да, все верно. Readiness на кафку + dependencies на зависимые сервисы.
:) отлично . Огромное спасибо Вячеслав.  А есть какой-нибудь checklist ?  просто посмотреть не забыл ли я еще какие-нибудь еще грабли в углу.
источник

В

Вячеслав in pro.kafka
Evgeny Rachlenko
:) отлично . Огромное спасибо Вячеслав.  А есть какой-нибудь checklist ?  просто посмотреть не забыл ли я еще какие-нибудь еще грабли в углу.
Я уверен, что где-то должен быть, но у меня нету. :)
источник

ER

Evgeny Rachlenko in pro.kafka
Вячеслав
Я уверен, что где-то должен быть, но у меня нету. :)
Если @gamussa  напишет такой документ,  его будут цитировать повсеместно :) Еще раз спасибо .
источник

ЮХ

Юра Ходырев... in pro.kafka
Никто не использует одновременно cp-kafka-rest и cp-kafka-connect?
источник

OP

O. Petr in pro.kafka
а в чем конфликт ?
источник

OP

O. Petr in pro.kafka
инописания timestamp разные в версиях авро, в 1.7 и в 1.9
источник

ЮХ

Юра Ходырев... in pro.kafka
O. Petr
а в чем конфликт ?
Есть схема в которой определено поле:
   {
     "name": "create_time",
     "type": {
       "type": "long",
       "connect.version": 1,
       "connect.name": "org.apache.kafka.connect.data.timestamp",
       "logicaltype": "timestamp-millis"
     }
   }

Данные в топик складываются с ID данной схемы.
Через консоль авро консюмером данная запись читается, но при попытке загрузить ее в СУБД Postgres с помощью JDBC Sink коннектора возникает ошибка.
В БД поле timestamp, а коннектор пытается подсунуть bigint
источник

ЮХ

Юра Ходырев... in pro.kafka
O. Petr
инописания timestamp разные в версиях авро, в 1.7 и в 1.9
На данный момент на всех компонентах используются конфлюент контейнеры одной версии
источник

ЮХ

Юра Ходырев... in pro.kafka
версия: 5.4.0
источник