Size: a a a

Kotlin Community

2020 April 07

BP

Bogdan Panchenko in Kotlin Community
Oleksii Skakun
Я тоже так думаю, сейчас думаю как это правильно сделать
сделать атомик флаг, и  повесить слушатель на окончание исходного флов, вроде ьыл такой
источник

AN

Alexander Nozik in Kotlin Community
Bogdan Panchenko
сделать атомик флаг, и  повесить слушатель на окончание исходного флов, вроде ьыл такой
А почему не повесить collect в конец без ланча и сделать на нем try-catch
источник

BP

Bogdan Panchenko in Kotlin Community
Alexander Nozik
А почему не повесить collect в конец без ланча и сделать на нем try-catch
я про цикл скорей
источник

BP

Bogdan Panchenko in Kotlin Community
он ведь никогда не кончится, или это так и задумано ?
источник

AN

Alexander Nozik in Kotlin Community
Bogdan Panchenko
я про цикл скорей
А зачем? Флаг-то уже есть isActive назвыается, если уж очень нужно
источник

AN

Alexander Nozik in Kotlin Community
Bogdan Panchenko
он ведь никогда не кончится, или это так и задумано ?
он обрушится вместе со скоупом
источник

BP

Bogdan Panchenko in Kotlin Community
Alexander Nozik
А зачем? Флаг-то уже есть isActive назвыается, если уж очень нужно
он есть у флов ?
источник

AN

Alexander Nozik in Kotlin Community
В channelFlow есть
источник

BP

Bogdan Panchenko in Kotlin Community
Alexander Nozik
В channelFlow есть
так речь про исходный флов, он ведь может перестать емитить, как узнать каналу что все хватит ?
источник

AN

Alexander Nozik in Kotlin Community
Bogdan Panchenko
так речь про исходный флов, он ведь может перестать емитить, как узнать каналу что все хватит ?
Вот тут я уже навскидку не отвечу. По хорошему там контекста взад прокидывается, и если корутина упадет, он дожен само-отмениться, но голову на отсечение не дам
источник

BP

Bogdan Panchenko in Kotlin Community
или ук них общий скоуп ?
источник

OS

Oleksii Skakun in Kotlin Community
Такс, только что проверил:

fun <T> Flow<T>.timedBuffer(millis:Long):Flow<List<T>> = channelFlow {
   
val list = arrayListOf<T>()

   launch {
       
while(isActive) {
           delay(millis)

           if(list.isNotEmpty()){
               send(ArrayList(list))
               list.clear()
           }
       }
   }
   
collect { list.add(it) }
}


private val job = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.Main + job)

scope.launch {
   
foo()
       .timedBuffer(1000)
       .collect {
           
print(it)
       }
}


Вообщем при отмене scope все завершается, channel тоже отменяется и isActive становится false
источник

AN

Alexander Nozik in Kotlin Community
private val job = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.Main + job)
Зачем такая штука? Просто supervisorScope нельзя?
источник

AN

Alexander Nozik in Kotlin Community
Oleksii Skakun
Такс, только что проверил:

fun <T> Flow<T>.timedBuffer(millis:Long):Flow<List<T>> = channelFlow {
   
val list = arrayListOf<T>()

   launch {
       
while(isActive) {
           delay(millis)

           if(list.isNotEmpty()){
               send(ArrayList(list))
               list.clear()
           }
       }
   }
   
collect { list.add(it) }
}


private val job = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.Main + job)

scope.launch {
   
foo()
       .timedBuffer(1000)
       .collect {
           
print(it)
       }
}


Вообщем при отмене scope все завершается, channel тоже отменяется и isActive становится false
Там косяк с синхронизацией. Есть риск потерять элемент
источник

AN

Alexander Nozik in Kotlin Community
Может быть ситуация, когда элемент добавился прямо перед clear
источник

OS

Oleksii Skakun in Kotlin Community
Я сейчас думаю как это исправить, у меня такая штука может быть 100%
источник

AN

Alexander Nozik in Kotlin Community
Oleksii Skakun
Я сейчас думаю как это исправить, у меня такая штука может быть 100%
Возьмите мутекс из моего примера
источник

ПГ

Павло Гриник in Kotlin Community
Oleksii Skakun
Такс, только что проверил:

fun <T> Flow<T>.timedBuffer(millis:Long):Flow<List<T>> = channelFlow {
   
val list = arrayListOf<T>()

   launch {
       
while(isActive) {
           delay(millis)

           if(list.isNotEmpty()){
               send(ArrayList(list))
               list.clear()
           }
       }
   }
   
collect { list.add(it) }
}


private val job = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.Main + job)

scope.launch {
   
foo()
       .timedBuffer(1000)
       .collect {
           
print(it)
       }
}


Вообщем при отмене scope все завершается, channel тоже отменяется и isActive становится false
Поставьте после
`.collect { print(it) }`
любую suspend-функцию и посмотрите, выполнится ли она
источник

ПГ

Павло Гриник in Kotlin Community
А разве аналогичный буфер из rx фильтрует пустые списки?
источник

OS

Oleksii Skakun in Kotlin Community
Функция не выполниться, потому как при отмене, collect выбрасывает kotlinx.coroutines.JobCancellationException:
источник