A
Size: a a a
A
A
A
A
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
require(function != null, "Function must not be null.")
val cleanFun = scalaClean(function)
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.addSource(cleanFun, typeInfo))
}
A
RK
RK
A
val javaStream = env.getJavaEnv.addSource(new FlinkKafkaConsumer[GenericRecord](
kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties),
new GenericRecordAvroTypeInfo(schema))
val stream = new DataStream[GenericRecord](javaStream)
A
RK
RK
A
RK
RK
A
AE
AE
K
AE