Такс, только что проверил:
fun <T> Flow<T>.timedBuffer(millis:Long):Flow<List<T>> = channelFlow {
val list = arrayListOf<T>()
launch {
while(isActive) {
delay(millis)
if(list.isNotEmpty()){
send(ArrayList(list))
list.clear()
}
}
}
collect { list.add(it) }
}
private val job = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.Main + job)
scope.launch {
foo()
.timedBuffer(1000)
.collect {
print(it)
}
}
Вообщем при отмене scope все завершается, channel тоже отменяется и isActive становится false