всем привет, никто не подскажет, как бороться с ошибкой, я делаю стриминг из кафки в hdfs, код очень прост:
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("subscribe", source) \
.option('startingOffsets', startingOffset) \
.option('failOnDataLoss', 'false') \
.load()
lines \
.selectExpr("CAST(timestamp AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("parquet") \
.outputMode("append") \
.option("path", checkpoint + "/streaming_data.parquet") \
.option('checkpointLocation', checkpoint) \
.start() \
.awaitTermination()
вылетает ошибка:
pyspark.sql.utils.StreamingQueryException: u'Partition monitoring.answers-0\'s offset was changed from 42 to 32, some data may have been missed. \nSome data may have been lost because they are not available in Kafka any more; either the\n data was aged out by Kafka or the topic may have been deleted before all the data in the\n topic was processed. If you don\'t want your streaming query to fail on such cases, set the\n source option "failOnDataLoss" to "false".\n \n=== Streaming Query ===\nIdentifier: [id = e8fc7c30-107b-44b7-8928-c68fb86d864a, runId = 03a598e4-e9a7-4f06-864d-95714df93c6c]\nCurrent Committed Offsets: {KafkaSource[Subscribe[monitoring.answers]]: {"monitoring.answers":{"0":42}}}\nCurrent Available Offsets: {KafkaSource[Subscribe[monitoring.answers]]: {"monitoring.answers":{"0":32}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [cast(timestamp#12 as string) AS timestamp#22, cast(value#8 as string) AS value#21]\n+- StreamingExecutionRelation KafkaSource[Subscribe[monitoring.answers]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'
Правильно ли я понимаю, что потеря данных происходит на загрузке данных в hdfs, но данные из кафки он уже считал и offset сдвинулся? Если так то как это решить