Size: a a a

2020 December 20

A

Alexander in Data Engineers
Andrey Smirnov
Я использую вариант через udf
Попробывал сделать с UDFs на таком уровне:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
def say_hello(name : str) -> str:
   return f"Hello {name}"
assert say_hello("Summer") == "Hello Summer"
say_hello_udf = udf(lambda name: say_hello(name), StringType())
df = spark.createDataFrame([("Rick,"),("Morty,")], ["name"])
df.withColumn("greetings", say_hello_udf(col("name")).show()
Только вместо функции say_hello поставил свою функцию с inference. Запустилось через spark-submit. Но вычисления не ускоряются от числа нод... Нужно как-то хитро каждой ноде сказать что делать. Плюс у меня подозрение что функция описанная выше вообще спарком не параллелиться. То есть расчёт происходит не на векторах, а на отдельных склярах друг за другом.
источник

A

Alexander in Data Engineers
Дмитрий
Можно пользоватся всеми библиотеками python.
Вы имеете ввиду если использовать UDFs?
источник

Д

Дмитрий in Data Engineers
Если тебе нет необходимости использовать весь кластер для ускорения обучения, то все просто. Если надо то все значительно труднее, и общего решения нет, все зависит от модели. Я допустим считаю для каждого юнита свою модель, поэтому все делается через groupby и udf.
источник

Д

Дмитрий in Data Engineers
Унитов у меня блольше 2000, поэтому хороше считается на кластере, но это пока только в r&d.
источник

AS

Andrey Smirnov in Data Engineers
Alexander
Попробывал сделать с UDFs на таком уровне:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
def say_hello(name : str) -> str:
   return f"Hello {name}"
assert say_hello("Summer") == "Hello Summer"
say_hello_udf = udf(lambda name: say_hello(name), StringType())
df = spark.createDataFrame([("Rick,"),("Morty,")], ["name"])
df.withColumn("greetings", say_hello_udf(col("name")).show()
Только вместо функции say_hello поставил свою функцию с inference. Запустилось через spark-submit. Но вычисления не ускоряются от числа нод... Нужно как-то хитро каждой ноде сказать что делать. Плюс у меня подозрение что функция описанная выше вообще спарком не параллелиться. То есть расчёт происходит не на векторах, а на отдельных склярах друг за другом.
А сколько у вас партиций, на таком игрушечном примере естественно ничего не будет
источник

AS

Andrey Smirnov in Data Engineers
Потом можно прикрутить   arrow
источник

A

Alexander in Data Engineers
Andrey Smirnov
А сколько у вас партиций, на таком игрушечном примере естественно ничего не будет
Я не устанавливаю конкретное число в  spark-submit. Если я понял правильно вопрос.
источник

A

Alexander in Data Engineers
Alexander
Я не устанавливаю конкретное число в  spark-submit. Если я понял правильно вопрос.
Вот мой спарк сабмит:
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue default \
--num-executors 100 \
--executor-memory 2000M \
--driver-memory 4G \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.task.maxFailures=1 \
--conf spark.executor.instances=100 \
--conf spark.blacklist.enabled=False \
--conf spark.oath.dockerImage="ml/rhel8_mlbundle:2020.01.1" \
--py-files your_app.py \
app.py
источник

AS

Andrey Smirnov in Data Engineers
Alexander
Я не устанавливаю конкретное число в  spark-submit. Если я понял правильно вопрос.
А сколько строк в dataframe?
источник

SS

Sergey Sheremeta in Data Engineers
дяденьки, здравствуйте!
подскажите почему так:
- есть spark structured streaming джоба, которая вычитывает из нескольких топиков Кафки по subscribePattern
- вижу, что каждый запуск микробатча (раз в минуту) приводит к росту использования памяти на драйвере
- знаю, что оффсеты/коммиты фиксирует именно драйвер, но тут получается что каждый экзекутор отправляет на драйвер и прочитанную порцию данных - как так???
источник

SS

Sergey Sheremeta in Data Engineers
источник

SS

Sergey Sheremeta in Data Engineers
совокупный размер вычитанных из топиков Кафки данных как раз 350-400 мегабайт за микробатч, на них и прирастает объем «Used heap»
источник

СХ

Старый Хрыч... in Data Engineers
😐если я правильно помню то в кафке данные сжаты
источник

СХ

Старый Хрыч... in Data Engineers
и  то что в кафке весит 350-400 метров вне её может весить куда больше
источник

SS

Sergey Sheremeta in Data Engineers
явно на драйвер ничего не отправляю вроде, план выглядит так:
источник

SS

Sergey Sheremeta in Data Engineers
причем и кол-во файлов, создаваемых в HDFS за каждый микробатч не равно единице. то бишь пишут-то воркеры, а не драйвер…
зачем на драйвер все это летит???
источник

SS

Sergey Sheremeta in Data Engineers
Старый Хрыч
и  то что в кафке весит 350-400 метров вне её может весить куда больше
я смотрю метрики самой спарк-жопы
источник

СХ

Старый Хрыч... in Data Engineers
может не сразу всё скидывает, хз, а в логах что?
источник

SK

Sergey Klimov in Data Engineers
Alexander
Попробывал сделать с UDFs на таком уровне:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
def say_hello(name : str) -> str:
   return f"Hello {name}"
assert say_hello("Summer") == "Hello Summer"
say_hello_udf = udf(lambda name: say_hello(name), StringType())
df = spark.createDataFrame([("Rick,"),("Morty,")], ["name"])
df.withColumn("greetings", say_hello_udf(col("name")).show()
Только вместо функции say_hello поставил свою функцию с inference. Запустилось через spark-submit. Но вычисления не ускоряются от числа нод... Нужно как-то хитро каждой ноде сказать что делать. Плюс у меня подозрение что функция описанная выше вообще спарком не параллелиться. То есть расчёт происходит не на векторах, а на отдельных склярах друг за другом.
Посмотрите в сторону pandas_udf, если хотите на векторах и spark версии 2.3 и больше
источник

SK

Sergey Klimov in Data Engineers
+ стоит избежать инициализации модели на каждый скаляр или вектор в udf, тоже может съедать производительность. Но тогда не всегда можно легко забродкастить модель. Был случай, когда в либе где-то в кишках были статические поля, которые превращались в тыкву при бродкасте объекта.
источник