Size: a a a

2019 November 14

IP

Ilya Pribytkov in Data Engineers
Dmitry Zuev
Может проще вообще схему передать?
может, но мне надо так сделать, потом можно и схему
источник

DZ

Dmitry Zuev in Data Engineers
Зочем?
источник

IP

Ilya Pribytkov in Data Engineers
Dmitry Zuev
Зочем?
надо добить
источник

IP

Ilya Pribytkov in Data Engineers
Dmitry Zuev
Зочем?
а вообще по прваославному надо со схемой в спарке работать?
источник

AS

Anton Shelin in Data Engineers
David Manukian
Дессерализацию ручками надо делать или возможно есть какой-то механизм который возвращает просто GenericRecord?
я не експерт но например в датабрикс есть возможность работать с конфлюент регистром
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", servers)
 .option("subscribe", "t")
 .load()
 .select(
   from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
   from_avro($"value", "t-value", schemaRegistryAddr).as("value"))


Если у вас не датабрикс то думаю можно просто при селекте вызывать руками десериализатор и потом делать флаттенинг
источник

DZ

Dmitry Zuev in Data Engineers
Ilya Pribytkov
а вообще по прваославному надо со схемой в спарке работать?
ну если у тебя csv то логично
источник

IP

Ilya Pribytkov in Data Engineers
Dmitry Zuev
ну если у тебя csv то логично
ладно на схему переделаю
источник

DM

David Manukian in Data Engineers
@anton_shelin да дело в том что с конфлюентом по проще, он все таки популярнее, хоть и платный. У меня же Hortonworks, я изначально в кафке properties выставил ключ и значения, не уверен, но по идее должно заработать
источник

DM

David Manukian in Data Engineers
@anton_shelin ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "server",
ConsumerConfig.
GROUP_ID_CONFIG -> UUID.randomUUID().toString,
ConsumerConfig.
KEY_DESERIALIZER_CLASS_CONFIG -> "StringDeserializer",
ConsumerConfig.
VALUE_DESERIALIZER_CLASS_CONFIG -> "KafkaAvroDeserializer",
ConsumerConfig.
AUTO_OFFSET_RESET_CONFIG -> "latest",
SchemaRegistryClient.Configuration.
SCHEMA_REGISTRY_URL.name() -> "schema"
источник

AS

Anton Shelin in Data Engineers
David Manukian
@anton_shelin ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "server",
ConsumerConfig.
GROUP_ID_CONFIG -> UUID.randomUUID().toString,
ConsumerConfig.
KEY_DESERIALIZER_CLASS_CONFIG -> "StringDeserializer",
ConsumerConfig.
VALUE_DESERIALIZER_CLASS_CONFIG -> "KafkaAvroDeserializer",
ConsumerConfig.
AUTO_OFFSET_RESET_CONFIG -> "latest",
SchemaRegistryClient.Configuration.
SCHEMA_REGISTRY_URL.name() -> "schema"
к сожалению с кафкой не работал. с авро вариантов ровно 3. 1. использовать контейнер чтобы схема была внутри сообщения. но это плохо так как схема занимает много места. может спасти если писать батчем 2. использовать схема регистри тут можно просто создать десериализатор руками из схемы но будет проблема если схема поменяется 3. использовать single object encoding тогда в начале каждого сообщения будет маркер схемы это то что надо. можно тогда руками написать десериализатор и дергать его этот путь наиболее удобен если нет решений из коробки. в общем вопрос у вас в кафку вы пишете в каком формате avro?
источник

IP

Ilya Pribytkov in Data Engineers
Dmitry Zuev
ну если у тебя csv то логично
из датафрейма можно схему сделать, задав колонки которые нужны?
источник

RI

Rustam Iksanov in Data Engineers
Ilya Pribytkov
из датафрейма можно схему сделать, задав колонки которые нужны?
датафрейм может быть валидирован схемой, а может и нет. Просто задание схемы сразу говорит, как кастить колонки в типы.
источник

IP

Ilya Pribytkov in Data Engineers
Rustam Iksanov
датафрейм может быть валидирован схемой, а может и нет. Просто задание схемы сразу говорит, как кастить колонки в типы.
Короче прошлый вариант заработал, я спросто название колонки написал неверно, ка обычно же
источник

DM

David Manukian in Data Engineers
@anton_shelin В любом случае надо дессериализатор писать если используешь схему реджистри, в случае конфлюента надо брать 4 байта (id схемы), а в случае хортона это 2 байта, но хортон не умеет возвращать только по айди, надо еще версию, поэтому если сериазация была через хортон тоже, то в пейлоде после айди 9 байтов идут схема версии
источник

DZ

Dmitry Zuev in Data Engineers
Ilya Pribytkov
Короче прошлый вариант заработал, я спросто название колонки написал неверно, ка обычно же
Если у тебя csv без шапки ещё и названия колонок
источник

IP

Ilya Pribytkov in Data Engineers
Dmitry Zuev
Если у тебя csv без шапки ещё и названия колонок
там с названием, в на стройках же прописываешь считать ли первую строку шакой, типо тру, просто посто неверно
источник

IP

Ilya Pribytkov in Data Engineers
написал
источник

DZ

Dmitry Zuev in Data Engineers
Я не понял что ты сказал
источник

IP

Ilya Pribytkov in Data Engineers
Dmitry Zuev
Я не понял что ты сказал
да, не парься, все равно я тебе пока, ничего нового не раскажу)))
источник

AS

Anton Shelin in Data Engineers
David Manukian
@anton_shelin В любом случае надо дессериализатор писать если используешь схему реджистри, в случае конфлюента надо брать 4 байта (id схемы), а в случае хортона это 2 байта, но хортон не умеет возвращать только по айди, надо еще версию, поэтому если сериазация была через хортон тоже, то в пейлоде после айди 9 байтов идут схема версии
Тогда можно взять это за пример и сделать свою функцию. которая будет парсить первые байты и брать схему из registry, надо правда еще с кешированием заморачиваться

import org.apache.spark.sql.avro._

// from_avro requires Avro schema in JSON string format.

val jsonFormatSchema = sparkSchema(name) или sparkSchema(schemaName: String, version: Int)//берем схему из registry
val df = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("subscribe", "topic1")
 .load()


val output = df
 .select(from_avro('value, jsonFormatSchema) as 'obj)
источник