Size: a a a

Kotlin Community

2020 April 07

OS

Oleksii Skakun in Kotlin Community
Это нормально
источник

ПГ

Павло Гриник in Kotlin Community
а если не отменять?
источник

OS

Oleksii Skakun in Kotlin Community
После того как последний обьекте залетел в collect, она выполнилась
источник

ПГ

Павло Гриник in Kotlin Community
странно
я тогда не понимаю почему так
источник

OS

Oleksii Skakun in Kotlin Community
Тут наверно нужно прояснить то находится в foo()
источник

OS

Oleksii Skakun in Kotlin Community
fun foo(): Flow<Int> = flow {
   
for (i in 1..20) {
       delay(500) // pretend we are asynchronously waiting 100 ms
       emit(i) // emit next value
   }
}
источник

OS

Oleksii Skakun in Kotlin Community
При таком раскладе метод который вызывается после collect , он вызовется
источник

ПГ

Павло Гриник in Kotlin Community
Oleksii Skakun
Тут наверно нужно прояснить то находится в foo()
не, как раз с foo проблем нету
источник

ПГ

Павло Гриник in Kotlin Community
не понимаю как channelFlow финиширует без явного на то указания
источник

OS

Oleksii Skakun in Kotlin Community
Вот что получилось, я проверил, теперь потокобезопасно:
fun <T> Flow<T>.timedBuffer(millis:Long):Flow<List<T>> = channelFlow {
   
val list = arrayListOf<T>()
   val mutex = Mutex()

   launch {
       
while(isActive) {
           delay(millis)

           if(list.isNotEmpty()){

               mutex.withLock(list) {
                   
send(ArrayList(list))
                   list.clear()
               }
           
}
       }
   }

   
collect { mutex.withLock(list) { list.add(it) } }
}


Поправьте, если что-то не так, я с Mutex не работал, но по тому что прочитал у него подход как в Java synchronized
источник

RE

Roman Elizarov in Kotlin Community
Работать будет. Но не очень хорошо что mutex заблокирован пока работает send. Я бы на JVM написал так: https://gist.github.com/elizarov/896f2c066c8841377f9db0b1f0a90f96
источник

OS

Oleksii Skakun in Kotlin Community
@relizarov Понял, спасибо большое))
источник

RE

Roman Elizarov in Kotlin Community
Но тут еще есть проблема, что может быть вам нужно чтобы она останавливалась когда исходный flow закончился и может даже высылала бы "остаток" в буфере — обновил gist
источник

AN

Alexander Nozik in Kotlin Community
Roman Elizarov
Но тут еще есть проблема, что может быть вам нужно чтобы она останавливалась когда исходный flow закончился и может даже высылала бы "остаток" в буфере — обновил gist
А чем JVM синхронизация лучше мутекса? Не говоря уже о том, что достаточно обернуть только clear, потому что send состояние не меняет.
источник

RE

Roman Elizarov in Kotlin Community
В данном случае с JVM синхронизацией код будет проще, короче и будет работать с меньшими накладными расходами и быстрей (хоть наверняка производительность не важна в данном случае, но у меня тут профессиональная деформация спортивного программиста и автора низкоуровневых библиотек).
источник

AN

Alexander Nozik in Kotlin Community
Roman Elizarov
В данном случае с JVM синхронизацией код будет проще, короче и будет работать с меньшими накладными расходами и быстрей (хоть наверняка производительность не важна в данном случае, но у меня тут профессиональная деформация спортивного программиста и автора низкоуровневых библиотек).
Проф. деформация это понятно. Не понятно за счет чего дороже. За счет суспенда? А он сильно дешевле чем лок треда?
источник

RE

Roman Elizarov in Kotlin Community
Ну и базовое правило любого concurrent кода — вся синхронизация должна быть инкапсулирована (поэтому отдельный класс Buffer).
источник

RE

Roman Elizarov in Kotlin Community
Будет сильно дешевле, да.
источник

AN

Alexander Nozik in Kotlin Community
Roman Elizarov
Будет сильно дешевле, да.
Учту, спасибо. Меня правда не спасет, потому что все на мультиплатформе
источник

RE

Roman Elizarov in Kotlin Community
Но чтобы попасть в ситуацию когда это важно надо очень постараться. И в этом случае у вас наверняка другие проблемы
источник