AZ
Size: a a a
AZ
e
NN
UR
AZ
YL
UR
UR
UR
YL
UR
YL
package cep;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.HashMap;
import java.util.Map;
class GenericRecordSchema implements KafkaDeserializationSchema<GenericRecord> {
private String registryUrl;
private transient KafkaAvroDeserializer deserializer;
public GenericRecordSchema(String registryUrl) {
this.registryUrl = registryUrl;
}
@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}
@Override
public GenericRecord deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
checkInitialized();
return (GenericRecord) deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
private void checkInitialized() {
if (deserializer == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
deserializer = new KafkaAvroDeserializer(client, props);
}
}
}
YL
private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {
Schema schema = ReflectData.get().getSchema(User.class);
return new FlinkKafkaConsumer<>(
topic,
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://xxxxxxxxxxx:8081"),
getConsumerProperties());
}
YL
YL
UR
UR
UR
YL
private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {
Schema schema = ReflectData.get().getSchema(User.class);
return new FlinkKafkaConsumer<>(
topic,
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://172.17.189.32:8081"),
getConsumerProperties());
}
UR
private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {
Schema schema = ReflectData.get().getSchema(User.class);
return new FlinkKafkaConsumer<>(
topic,
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://172.17.189.32:8081"),
getConsumerProperties());
}