Size: a a a

2021 March 29

Y

Ya Anna in Data Engineers
Что то на татарском
источник

SS

Sergey Sheremeta in Data Engineers
дяденьки, здравствуйте!
помогите понять как происходит одновременный вызов ограниченного batchSize'ом кол-ва запросов  в этом куске кода :
это какая-то шибко сложная для меня магическая магия ((

object ThreadedConcurrentContext {
 import scala.util._
 import scala.concurrent._
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.Duration._
 import scala.concurrent.ExecutionContext.Implicits.global

 /** Wraps a code block in a Future and returns the future */
 def executeAsync[T](f: => T): Future[T] = {
   Future(f)(ec)
 }

 /** Awaits only a set of elements at a time. Instead of waiting for the entire batch
   * to finish waits only for the head element before requesting the next future*/
 def awaitSliding[T](it: Iterator[Future[T]], batchSize: Int = 3, timeout: Duration = Inf): Iterator[T] = {
   val slidingIterator = it.sliding(batchSize - 1).withPartial(true) //Our look ahead (hasNext) will auto start the nth future in the batch
   val (initIterator, tailIterator) = slidingIterator.span(_ => slidingIterator.hasNext)
   initIterator.map( futureBatch => Await.result(futureBatch.head, timeout)) ++
     tailIterator.flatMap( lastBatch => Await.result(Future.sequence(lastBatch), timeout))
 }
}
источник

SS

Sergey Sheremeta in Data Engineers
источник

MY

Maxim Yastremsky in Data Engineers
Pavel
Закопать его
А почему pentaho надо закапывать? Не вписывается в современный стек?
источник

P

Pavel in Data Engineers
Maxim Yastremsky
А почему pentaho надо закапывать? Не вписывается в современный стек?
Есть более стандартные и трендовые инструменты
источник

MY

Maxim Yastremsky in Data Engineers
у меня на горизонте маячит задача "рассмотреть альтернативы Oracle data integrator". Т.е. почти полностью накуренный ETL на RDBMS. Что сейчас стоит смотреть?
источник

AZ

Anton Zadorozhniy in Data Engineers
Maxim Yastremsky
у меня на горизонте маячит задача "рассмотреть альтернативы Oracle data integrator". Т.е. почти полностью накуренный ETL на RDBMS. Что сейчас стоит смотреть?
А чем ODI не устраивает?
источник

MY

Maxim Yastremsky in Data Engineers
там по факту вообще стоит Warehouse Builder. Вот с него надо переехать. Предположительно на ODI. Но т.к. все равно много боли не избежать - хотелось попутно рассмотреть варианты. Может дешевле/универсальнее получится.
источник

AS

Andrey Smirnov in Data Engineers
тут магия что следующий батч будет запущен после того как завершится первый таск из предыдущего батча, ты про это?
источник

MY

Maxim Yastremsky in Data Engineers
например, если будет удобная заливка в/из Hive в том же инструменте
источник

EK

Evgenii Kuznetcov in Data Engineers
Sergey Sheremeta
дяденьки, здравствуйте!
помогите понять как происходит одновременный вызов ограниченного batchSize'ом кол-ва запросов  в этом куске кода :
это какая-то шибко сложная для меня магическая магия ((

object ThreadedConcurrentContext {
 import scala.util._
 import scala.concurrent._
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.Duration._
 import scala.concurrent.ExecutionContext.Implicits.global

 /** Wraps a code block in a Future and returns the future */
 def executeAsync[T](f: => T): Future[T] = {
   Future(f)(ec)
 }

 /** Awaits only a set of elements at a time. Instead of waiting for the entire batch
   * to finish waits only for the head element before requesting the next future*/
 def awaitSliding[T](it: Iterator[Future[T]], batchSize: Int = 3, timeout: Duration = Inf): Iterator[T] = {
   val slidingIterator = it.sliding(batchSize - 1).withPartial(true) //Our look ahead (hasNext) will auto start the nth future in the batch
   val (initIterator, tailIterator) = slidingIterator.span(_ => slidingIterator.hasNext)
   initIterator.map( futureBatch => Await.result(futureBatch.head, timeout)) ++
     tailIterator.flatMap( lastBatch => Await.result(Future.sequence(lastBatch), timeout))
 }
}
Тоже не могу понять, что происходит, хотя сам писал подобные вещи.
Это библиотечный код, который надо понять, или свой код, который можно переписать?
источник

SS

Sergey Sheremeta in Data Engineers
Evgenii Kuznetcov
Тоже не могу понять, что происходит, хотя сам писал подобные вещи.
Это библиотечный код, который надо понять, или свой код, который можно переписать?
это отсюда - Concurrency-In-Spark. наверное, можно считать библиотечным кодом
источник

SS

Sergey Sheremeta in Data Engineers
Andrey Smirnov
тут магия что следующий батч будет запущен после того как завершится первый таск из предыдущего батча, ты про это?
да я в целом картину не ухватываю. не могу ментальную модель происходящего построить
источник

SS

Sergey Sheremeta in Data Engineers
было бы здорово (и полагаю, полезно многим) - если бы кто-то умный объяснил на пальцах "чо-как вощще"
источник

SS

Sergey Sheremeta in Data Engineers
или это уже пресловутые "монады в Скале", которые не нужны дата-инженерам?
источник

AS

Andrey Smirnov in Data Engineers
Sergey Sheremeta
или это уже пресловутые "монады в Скале", которые не нужны дата-инженерам?
нет тут монад, обыкновенные итераторы
источник

ИК

Иван Калининский... in Data Engineers
Andrey Smirnov
нет тут монад, обыкновенные итераторы
Iterator[_] считается монадой?
источник

EK

Evgenii Kuznetcov in Data Engineers
Sergey Sheremeta
это отсюда - Concurrency-In-Spark. наверное, можно считать библиотечным кодом
Окей, я понял, что происходит
источник

AS

Andrey Smirnov in Data Engineers
Иван Калининский
Iterator[_] считается монадой?
а Future[_]?
источник

ИК

Иван Калининский... in Data Engineers
Andrey Smirnov
а Future[_]?
кто бы объяснил!
источник