Size: a a a

2019 November 15

M

Mi in Data Engineers
Скорее всего из-за lazy и отсутствия кеширования
источник

VS

Vadim Sukhanov in Data Engineers
Всем привет, кто-нибудь использовал debezium для капчуринга данных из MSSql?

есть несколько вопросов:
- Интересует были ли какие то грабли на которые наступали и если да то какие?
- Есть ли что-то что нужно учитывать в настройке mssql при включении cdc?
- Как сильно включение cdc начинает бить по перфомансу mssql?
источник

Ik

Ilia ksen in Data Engineers
Mi
покажите код создания
val flatDfFromXml:DataFrame = spark.createDataFrame(rowRDD, XmlUtils.schema())
источник

OP

O. Petr in Data Engineers
Ilia ksen
val flatDfFromXml:DataFrame = spark.createDataFrame(rowRDD, XmlUtils.schema())
cache для кеширования вызовите и потом будете одни и те же уиды получать
источник

Ik

Ilia ksen in Data Engineers
O. Petr
cache для кеширования вызовите и потом будете одни и те же уиды получать
спасибо
источник

ME

Max Efremov in Data Engineers
Ilia ksen
Такая проблема при создании датафрейма, создаю id с помощью UUID.randomUUID.toString . Но при дальнейшей работе с этим датафреймом, например селект, джоин и т д, спарк несколько раз вызывает этот метод
Закэшируй))
источник

ME

Max Efremov in Data Engineers
А, уже написали, сорри
источник

Ik

Ilia ksen in Data Engineers
Max Efremov
А, уже написали, сорри
👌
источник

ME

Max Efremov in Data Engineers
O. Petr
Теп с кастомной udf можно к такому привести, а засчет еще одной оконной и фильтра оставить только те где след строка с 0 ?
--------------------------------
-|ts|id|diff_prev_ts|sum_prevs_ts
--------------------------------
1|..|1..| 10 sec        |10
2|..|1..| 20 sec        |30
3|..|1..| 10 sec        |40
4|..|1..| 20 sec        |50
5|..|1..| 180 sec      |0
6|..|1..| 10 sec        |10
--------------------------------
В общем у меня вот так вышло:
timestamps = spark.createDataFrame(
   [
       [1, 1573820575],
       [2, 1573820580],
       [3, 1573820600],
       [4, 1573820603],
       [5, 1573820703],
       [6, 1573820713],
   ],
   ["id", "timestamp"]
)
previous_window = Window.partitionBy().orderBy("id")
timestamps = (
   timestamps.withColumn("previous_timestamp", lag(col("timestamp")).over(previous_window))
   .withColumn("time_diff", col("timestamp") - col("previous_timestamp"))
   .fillna({"time_diff": 0})
   .withColumn("is_new_session", when(col("time_diff") > 60, lit(1)).otherwise(lit(0)))
   .withColumn("session_number", lag(col("is_new_session"), default=0).over(previous_window) + col("is_new_session"))
   .withColumn("session_time_diff", when(col("is_new_session") == 1, lit(0)).otherwise(col("time_diff")))
)
timestamps.show()
timestamps.groupBy(["session_number"]).agg(
   first("id").alias("start_id"),
   spark_sum("session_time_diff").alias("duration")
).show()
источник

ME

Max Efremov in Data Engineers
+---+----------+------------------+---------+--------------+--------------+-----------------+
| id| timestamp|previous_timestamp|time_diff|is_new_session|session_number|session_time_diff|
+---+----------+------------------+---------+--------------+--------------+-----------------+
|  1|1573820575|              null|        0|             0|             0|                0|
|  2|1573820580|        1573820575|        5|             0|             0|                5|
|  3|1573820600|        1573820580|       20|             0|             0|               20|
|  4|1573820603|        1573820600|        3|             0|             0|                3|
|  5|1573820703|        1573820603|      100|             1|             1|                0|
|  6|1573820713|        1573820703|       10|             0|             1|               10|
+---+----------+------------------+---------+--------------+--------------+-----------------+
источник

ME

Max Efremov in Data Engineers
+--------------+--------+--------+
|session_number|start_id|duration|
+--------------+--------+--------+
|             0|       1|      28|
|             1|       5|      10|
+--------------+--------+--------+
источник

ME

Max Efremov in Data Engineers
Потенциальный косяк - пустой partitionBy у окна, это сильно замедлит выполнение
источник

ME

Max Efremov in Data Engineers
Там даже спарк ругается:
No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
источник

ME

Max Efremov in Data Engineers
Вопрос к аудиории: как сделать оптимальнее?)
источник

AS

Anton Shelin in Data Engineers
Max Efremov
Вопрос к аудиории: как сделать оптимальнее?)
а что колонки типа user_id нету?
источник

AS

Anton Shelin in Data Engineers
если нету то оптимальней не получится
источник

ME

Max Efremov in Data Engineers
да, логично
источник

AS

Anton Shelin in Data Engineers
Max Efremov
да, логично
хотя по логике можно из таймстемпа получить дату и партишенить по ней. если не страшно что будут разрывы на пересечении дня. хотя и их потом можно склеить
источник

В

Вадим in Data Engineers
Ребят, умеет кто нибудь в nifi?
источник

S

Stanislav in Data Engineers
Вадим
Ребят, умеет кто нибудь в nifi?
источник