M
Size: a a a
M
VS
Ik
OP
Ik
ME
ME
Ik
ME
timestamps = spark.createDataFrame(
[
[1, 1573820575],
[2, 1573820580],
[3, 1573820600],
[4, 1573820603],
[5, 1573820703],
[6, 1573820713],
],
["id", "timestamp"]
)
previous_window = Window.partitionBy().orderBy("id")
timestamps = (
timestamps.withColumn("previous_timestamp", lag(col("timestamp")).over(previous_window))
.withColumn("time_diff", col("timestamp") - col("previous_timestamp"))
.fillna({"time_diff": 0})
.withColumn("is_new_session", when(col("time_diff") > 60, lit(1)).otherwise(lit(0)))
.withColumn("session_number", lag(col("is_new_session"), default=0).over(previous_window) + col("is_new_session"))
.withColumn("session_time_diff", when(col("is_new_session") == 1, lit(0)).otherwise(col("time_diff")))
)
timestamps.show()
timestamps.groupBy(["session_number"]).agg(
first("id").alias("start_id"),
spark_sum("session_time_diff").alias("duration")
).show()
ME
+---+----------+------------------+---------+--------------+--------------+-----------------+
| id| timestamp|previous_timestamp|time_diff|is_new_session|session_number|session_time_diff|
+---+----------+------------------+---------+--------------+--------------+-----------------+
| 1|1573820575| null| 0| 0| 0| 0|
| 2|1573820580| 1573820575| 5| 0| 0| 5|
| 3|1573820600| 1573820580| 20| 0| 0| 20|
| 4|1573820603| 1573820600| 3| 0| 0| 3|
| 5|1573820703| 1573820603| 100| 1| 1| 0|
| 6|1573820713| 1573820703| 10| 0| 1| 10|
+---+----------+------------------+---------+--------------+--------------+-----------------+
ME
+--------------+--------+--------+
|session_number|start_id|duration|
+--------------+--------+--------+
| 0| 1| 28|
| 1| 5| 10|
+--------------+--------+--------+
ME
partitionBy
у окна, это сильно замедлит выполнениеME
No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
ME
AS
AS
ME
AS
В
S