Привет!
Подскажите пожалуйста как лучше в spring reactor решить проблему вызова mapping функции на каждом flux.
Суть такая. Есть один publisher который дает стринги и есть клиенты, которые парсят пришедшие строки и фильтруют дто. Я организовал код вот так
val publisher: Flux<String> = ..
val sub1 =
publisher.map{veryExpensiveConverter.convert(it)}
.filter(it.metric<10)
val sub2 =
publisher.map{veryExpensiveConverter.convert(it)}
.filter(it.metric>5)
val sub3 =
sub2.map{cheapConverter.convert(it)}
.filter(it.metric>8)
Flux.merge(sub1, sub2, sub3, ..., subn)
.map{//some logic for following data of subscribers}
.subscribe()
Но получаю что на каждый sub1, sub2, на те же самые входные строки будет вызываться
veryExpensiveConverter
Получается такое
Input1 -> 1) veryExpensiveConverter -> filter1 -> output1
2) -> veryExpensiveConverter -> filter2 -> output2
3) -> veryExpensiveConverter -> cheapConverter -> filter3 -> output3
А хотелось бы такое
Input1 -> veryExpensiveConverter ->
1)-> filter1 -> output1
2)-> filter2 -> output2
3)-> cheapConverter -> filter3 -> output3
Как такое можно провернуть?