Size: a a a

2021 March 30

AZ

Anton Zadorozhniy in Data Engineers
вообще @renardeinside , тут спрашивают чем  delta engine хорош
источник

e

er@essbase.ru in Data Engineers
Anton Zadorozhniy
то есть это нормальная МРР СУБД, но которая понимает Spark API
ох.. я так понимаю что это все только в облаке и за денежку ?
источник

NN

No Name in Data Engineers
er@essbase.ru
ох.. я так понимаю что это все только в облаке и за денежку ?
Вроде да
источник

UR

Uncle Ruckus in Data Engineers
Yuri Lyulchenko
Кто-нибудь может помочь разобраться как во Flink из топика можно получить записи в формате GenericData (Avro), используя схему из SchemaRegistry? Что-то совсем залип.... ☹️
Получить записи - в датасет?
источник

AZ

Anton Zadorozhniy in Data Engineers
er@essbase.ru
ох.. я так понимаю что это все только в облаке и за денежку ?
это databricks сервис
источник

YL

Yuri Lyulchenko in Data Engineers
Uncle Ruckus
Получить записи - в датасет?
DataStream<GenericRecord>
источник

UR

Uncle Ruckus in Data Engineers
Функцию запилить, десериализационную. DeserializationSchema
источник

UR

Uncle Ruckus in Data Engineers
FlinkKafkaConsumer<GenericRecord> kafkaStream = new FlinkKafkaConsumer<>("mytopic", new AdsFrameDeserializationSchema(), properties);
источник

UR

Uncle Ruckus in Data Engineers
вы же не с самим авро будете работать, а с POJO каким
источник

YL

Yuri Lyulchenko in Data Engineers
Uncle Ruckus
Функцию запилить, десериализационную. DeserializationSchema
А можете глянуть. Вот это выкрал со стековерфлоу:
источник

UR

Uncle Ruckus in Data Engineers
DataStream<GenericRecord> Stream = env.addSource(kafkaStream
источник

YL

Yuri Lyulchenko in Data Engineers
package cep;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.HashMap;
import java.util.Map;

class GenericRecordSchema implements KafkaDeserializationSchema<GenericRecord> {

   private String registryUrl;
   private transient KafkaAvroDeserializer deserializer;

   public GenericRecordSchema(String registryUrl) {
       this.registryUrl = registryUrl;
   }

   @Override
   public boolean isEndOfStream(GenericRecord nextElement) {
       return false;
   }

   @Override
   public GenericRecord deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
       checkInitialized();
       return (GenericRecord) deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
   }

   @Override
   public TypeInformation<GenericRecord> getProducedType() {
       return TypeExtractor.getForClass(GenericRecord.class);
   }

   private void checkInitialized() {
       if (deserializer == null) {
           Map<String, Object> props = new HashMap<>();
           props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
           props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
           SchemaRegistryClient client =
                   new CachedSchemaRegistryClient(
                           registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
           deserializer = new KafkaAvroDeserializer(client, props);
       }
   }
}
источник

YL

Yuri Lyulchenko in Data Engineers
Консьюмер:

 
 private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

       Schema schema = ReflectData.get().getSchema(User.class);

       return new FlinkKafkaConsumer<>(
               topic,
               ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://xxxxxxxxxxx:8081"),
               getConsumerProperties());
   }
источник

YL

Yuri Lyulchenko in Data Engineers
При таком раскладе у меня ошибка:

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException

Serialization trace:

reserved (org.apache.avro.Schema$Field)

fieldMap (org.apache.avro.Schema$RecordSchema)

schema (org.apache.avro.generic.GenericData$Record)
источник

YL

Yuri Lyulchenko in Data Engineers
Сорри, что медленно инфу кидаю, с удаленного раб. места приходится все стягивать
источник

UR

Uncle Ruckus in Data Engineers
Жесть какая. Я только одно понял, что схему они подтягивают откуда-то снаружи ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://172.17.189.32:8081"),
источник

UR

Uncle Ruckus in Data Engineers
Вопрос, что в чем конкретно бонус - непонятно, класс-то в который сериализуется все равно должен быть локально
источник

UR

Uncle Ruckus in Data Engineers
а так - вот встроенный класс KafkaAvroDeserializer deserializer;
источник

YL

Yuri Lyulchenko in Data Engineers
А вот так будет более правильно?:
    private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

       Schema schema = ReflectData.get().getSchema(User.class);

       return new FlinkKafkaConsumer<>(
               topic,
               ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://172.17.189.32:8081"),
               getConsumerProperties());
   }
источник

UR

Uncle Ruckus in Data Engineers
Yuri Lyulchenko
А вот так будет более правильно?:
    private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

       Schema schema = ReflectData.get().getSchema(User.class);

       return new FlinkKafkaConsumer<>(
               topic,
               ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://172.17.189.32:8081"),
               getConsumerProperties());
   }
Да нет, я не понял зачем ее снаружи "http://172.17.189.32:8081" тащить, и что там вообще внутри
источник