Size: a a a

2021 January 29

SB

S B in pro.kafka
Юрий Бадальянц
Это нормальный сценарий
В кровавом энтерпрайзе разве что
источник

ЮБ

Юрий Бадальянц... in pro.kafka
S B
В кровавом энтерпрайзе разве что
Этот вариант даже в доках предложен. Кафка не форсирует хранение офсетов в ней самой. К тому же, хранение офсетов с данными избавляет от проблемы расстнхрона систем в случае падений.
источник

SB

S B in pro.kafka
Юрий Бадальянц
Этот вариант даже в доках предложен. Кафка не форсирует хранение офсетов в ней самой. К тому же, хранение офсетов с данными избавляет от проблемы расстнхрона систем в случае падений.
А, ну тогда да. Если в доках что-то предложено, то аргумент железный.
источник

SB

S B in pro.kafka
Юрий Бадальянц
Этот вариант даже в доках предложен. Кафка не форсирует хранение офсетов в ней самой. К тому же, хранение офсетов с данными избавляет от проблемы расстнхрона систем в случае падений.
Избавляет от рассинхрона? Только если не принимать во внимание разные редкие неприятные ситуации.
источник
2021 January 30

N

Nenormalniy in pro.kafka
S B
Избавляет от рассинхрона? Только если не принимать во внимание разные редкие неприятные ситуации.
чё за ситуации
источник

ЧП

Чёрный Плащ... in pro.kafka
Подозреваю, это ситуации когда произошло падение между фиксацией в системе и в Кафка
источник

SB

S B in pro.kafka
Nenormalniy
чё за ситуации
Атомарная запись сразу в несколько источников: подтверждение офсета и сохранение этого самого офсета в БД с нужным уровнем изоляции.
источник

SB

S B in pro.kafka
За это юзеркод отвечает, причём никакого чёткого протокола там и в помине не будет.
источник

SB

S B in pro.kafka
Короче, разве что для кровавого энтерпрайза это хорошо, все остальное - по крайней мере надо хорошо задуматься, а что и зачем я делаю?
источник

過酸化水素 in pro.kafka
Есть ли какой-нибудь устоявшийся рецепт по засовыванию Kafka за Nat, я конечно сунул, но по какой-то причине у меня ingress сбегает с ноды
источник

過酸化水素 in pro.kafka
Здравствуйте (:
источник

VG

Vik Gamov in pro.kafka
過酸化水素
Есть ли какой-нибудь устоявшийся рецепт по засовыванию Kafka за Nat, я конечно сунул, но по какой-то причине у меня ingress сбегает с ноды
Кубер?
источник

VG

Vik Gamov in pro.kafka
источник

過酸化水素 in pro.kafka
Нет, k3s (: там я в качестве ingress юзаю traefik. И он по какой-то неведомой причине, иногда сбегает.
источник
2021 February 01

AD

Alex Dev in pro.kafka
народ а есть где таблица соотношения версий ksqldb vs  cp-ksqldb
источник

AR

Alexander Ryzhenko in pro.kafka
Доброго времени суток. Пробую играться с ksql
есть топик, в который kafka-connect’ом (debezium mysql) наливаются данные в json формате
создаю стрим

CREATE STREAM o (
OrderID INT KEY,
__version STRING
) WITH (
 kafka_topic = 'orders',
 value_format = 'json'
);


При попытке почитать из него select * from o emit changes
получаю в логах

[2021-01-31 22:28:11,684] WARN Exception caught during Deserialization, taskId: 0_0, topic:  orders, partition: 0, offset: 1729 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic:  orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] WARN stream-thread [_confluent-ksql-default_transient_7690121861292502830_1612132084399-d7889c74-b9f0-42b8-ba9f-03019c4d2db0-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[ orders] partition=[0] offset=[1729] (org.apache.kafka.streams.processor.internals.RecordDeserializer:88)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic:  orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing KAFKA message from topic:  orders","recordB64":null,"cause":["Size of data received by IntegerDeserializer is not 4"],"topic":" orders"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.7690121861292502830.KsqlTopic.Source.deserializer:44)


Если убрать ключ и оставить только OrderID INT, (без KEY) то читается нормально.

То же с таболицами. CREATE TABLE …. при создании с PRIMARY KEY не читается. Ощибки те же. В поле int нормальный, переполнения нет.
ЧЯДНТ?
источник

VG

Vik Gamov in pro.kafka
Alexander Ryzhenko
Доброго времени суток. Пробую играться с ksql
есть топик, в который kafka-connect’ом (debezium mysql) наливаются данные в json формате
создаю стрим

CREATE STREAM o (
OrderID INT KEY,
__version STRING
) WITH (
 kafka_topic = 'orders',
 value_format = 'json'
);


При попытке почитать из него select * from o emit changes
получаю в логах

[2021-01-31 22:28:11,684] WARN Exception caught during Deserialization, taskId: 0_0, topic:  orders, partition: 0, offset: 1729 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic:  orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] WARN stream-thread [_confluent-ksql-default_transient_7690121861292502830_1612132084399-d7889c74-b9f0-42b8-ba9f-03019c4d2db0-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[ orders] partition=[0] offset=[1729] (org.apache.kafka.streams.processor.internals.RecordDeserializer:88)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic:  orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing KAFKA message from topic:  orders","recordB64":null,"cause":["Size of data received by IntegerDeserializer is not 4"],"topic":" orders"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.7690121861292502830.KsqlTopic.Source.deserializer:44)


Если убрать ключ и оставить только OrderID INT, (без KEY) то читается нормально.

То же с таболицами. CREATE TABLE …. при создании с PRIMARY KEY не читается. Ощибки те же. В поле int нормальный, переполнения нет.
ЧЯДНТ?
1) в каком формате льёт дебезиум?
2) какая версия ksqlDB?
3) что пишет print orders from beginning?
источник

AR

Alexander Ryzhenko in pro.kafka
Vik Gamov
1) в каком формате льёт дебезиум?
2) какая версия ksqlDB?
3) что пишет print orders from beginning?
1. JSON. Применен ExtractRowTransformer, так что там без вложеннных структур. Пример соообщения в топике:
{"OrderID":42501768,"__version":"1612153780532575455"}

2.
confluentinc/ksqldb-server:0.14.0
confluentinc/ksqldb-cli:0.14.0

3.
Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/01/29 08:59:23.328 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763189080318"}

rowtime: 2021/01/29 08:59:23.330 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763320490304"}
……
источник

A

Andrey in pro.kafka
Скажите, а есть ли способ автоматического перезапуска stream'а в ksqldb? Ситуация следующая, была амазоновская кафка которая решила пропатчиться, это взяло видать достаточно долгое время и все stream'ы не смогли закомитить оффсеты, после чего зафейлились и не пытались перезапуститься. retention период был достаточно низким, соответсвено когда опомнились, какие-то данные уже пропали.
источник

AR

Alexander Ryzhenko in pro.kafka
Alexander Ryzhenko
1. JSON. Применен ExtractRowTransformer, так что там без вложеннных структур. Пример соообщения в топике:
{"OrderID":42501768,"__version":"1612153780532575455"}

2.
confluentinc/ksqldb-server:0.14.0
confluentinc/ksqldb-cli:0.14.0

3.
Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/01/29 08:59:23.328 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763189080318"}

rowtime: 2021/01/29 08:59:23.330 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763320490304"}
……
Сам докопался, что проблема в KEY_FORMAT.
При создании стрима я его не указываю и он подставляется деволтным KAFKA. Дебезиум же пишет в ключ JSON, например:

{"OrderID":42511627}

но создание с KEY_FORMAT='JSON' приводит к другой ошибке при попытке чтения из стрима:

[2021-02-01 10:41:04,832] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: orders. Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","recordB64":null,"cause":["Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","Can't convert type. sourceType: ObjectNode, requiredType: INTEGER"],"topic»:»orders»},»recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.6302525988885084951.KsqlTopic.Source.deserializer:44)
источник