Вот что получилось, я проверил, теперь потокобезопасно:
fun <T> Flow<T>.timedBuffer(millis:Long):Flow<List<T>> = channelFlow {
val list = arrayListOf<T>()
val mutex = Mutex()
launch {
while(isActive) {
delay(millis)
if(list.isNotEmpty()){
mutex.withLock(list) {
send(ArrayList(list))
list.clear()
}
}
}
}
collect { mutex.withLock(list) { list.add(it) } }
}
Поправьте, если что-то не так, я с Mutex не работал, но по тому что прочитал у него подход как в Java synchronized