Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

Imagine a Spark Dataframe consisting of value observations from variables. Each observation has a specific timestamp and those timestamps are not the same between different variables. This is because the timestamp is generated when the value of a variable changed and is recorded.

#Variable     Time                Value
#852-YF-007   2016-05-10 00:00:00 0
#852-YF-007   2016-05-09 23:59:00 0
#852-YF-007   2016-05-09 23:58:00 0

Problem I would like to put all variables into the same frequency (for instance 10min) using forward-fill. To visualize this, I copied a page from the Book "Python for Data Analysis". Question: How to do that on a Spark Dataframe in an efficient way?

Python for Data Analysis

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
903 views
Welcome To Ask or Share your Answers For Others

1 Answer

Question: How to do that on a Spark Dataframe in an efficient way?

Spark DataFrame is simply not a good choice for an operation like this one. In general SQL primitives won't be expressive enough and PySpark DataFrame doesn't provide low level access required to implement it.

While re-sampling can be easily represented using epoch / timestamp arithmetics. With data like this:

from pyspark.sql.functions import col, max as max_, min as min_

df = (spark  
    .createDataFrame([
        ("2012-06-13", 0.694), ("2012-06-20", -2.669), ("2012-06-27", 0.245)],   
        ["ts", "val"])        
   .withColumn("ts", col("ts").cast("date").cast("timestamp")))

we can re-sample input:

day = 60 * 60 * 24
epoch = (col("ts").cast("bigint") / day).cast("bigint") * day

with_epoch = df.withColumn("epoch", epoch)

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()

and join with reference:

# Reference range 
ref = spark.range(
    min_epoch, max_epoch + 1, day
).toDF("epoch")

(ref
    .join(with_epoch, "epoch", "left")
    .orderBy("epoch")
    .withColumn("ts_resampled", col("epoch").cast("timestamp"))
    .show(15, False))

## +----------+---------------------+------+---------------------+   
## |epoch     |ts                   |val   |ts_resampled         |
## +----------+---------------------+------+---------------------+
## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0|
## |1339545600|null                 |null  |2012-06-13 02:00:00.0|
## |1339632000|null                 |null  |2012-06-14 02:00:00.0|
## |1339718400|null                 |null  |2012-06-15 02:00:00.0|
## |1339804800|null                 |null  |2012-06-16 02:00:00.0|
## |1339891200|null                 |null  |2012-06-17 02:00:00.0|
## |1339977600|null                 |null  |2012-06-18 02:00:00.0|
## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0|
## |1340150400|null                 |null  |2012-06-20 02:00:00.0|
## |1340236800|null                 |null  |2012-06-21 02:00:00.0|
## |1340323200|null                 |null  |2012-06-22 02:00:00.0|
## |1340409600|null                 |null  |2012-06-23 02:00:00.0|
## |1340496000|null                 |null  |2012-06-24 02:00:00.0|
## |1340582400|null                 |null  |2012-06-25 02:00:00.0|
## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0|
## +----------+---------------------+------+---------------------+

In Spark >= 3.1 replace

col("epoch").cast("timestamp")

with

from pyspark.sql.functions import timestamp_seconds

timestamp_seconds("epoch")

Using low level APIs it is possible to fill data like this as I've shown in my answer to Spark / Scala: forward fill with last observation. Using RDDs we could also avoid shuffling data twice (once for join, once for reordering).

But there is much more important problem here. Spark performs optimally when problem can be reduced to element wise or partition wise computations. While forward fill is the case when it is possible, as far as I am aware this is typically not the case with commonly used time series models and if some operation requires a sequential access then Spark won't provide any benefits at all.

So if you work with series which are large enough to require distributed data structure you'll probably want to aggregate it to some object that can be easily handled by a single machine and then use your favorite non-distributed tool to handle the rest.

If you work with multiple time series where each can be handled in memory then there is of course sparkts, but I know you're already aware of that.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...