Всем привет!
Вопрос на ночь глядя, хотя я сам уже спать пошел
Как заставить работать кафка стриминг в режиме мультитрейдинга в ассинхронном режиме?
Например вот такой вот кусок кода:
val nConsumers: Int = 4
val consumerList: IndexedSeq[KafkaConsumer[String, String]] = Range(0, nConsumers).map(x => consumer)
while(true){
val futuresList: List[Future[TrieMap[String, TrieMap[String, String]]]] =
consumerList.map(x => Future{getKafkaData(resultMap, x)}).toList
val listFutures: Future[List[TrieMap[String, TrieMap[String, String]]]] = Future.sequence(futuresList)
Await.result(listFutures, concurentDuration("60 seconds"))
}
Причем запуская этот код в джупитере, я наблюдаю как он прекрасно работает, обгоняя сильно очень толстый по трафику топик, НО запуская этот же код в собранном джарнике ловлю ошибку что кафка не умеет в мультитрединге
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access