I have some data that I want to group by a certain column, then aggregate a series of fields based on a rolling time window from the group.
Here is some example data:
df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1),
Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2),
Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3),
Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3),
Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3),
Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)])
I want to group by group_by
, then create time windows that start at the earliest date and extend until there are 30 days with no entry for that group. After those 30 days are over, the next time window would start with the date of the next row that did not fall in the previous window.
I then want to aggregate, for example getting the average of get_avg
, and the first result of get_first
.
So the output for this example should be:
group_by first date of window get_avg get_first
group1 2016-01-01 5 1
group2 2016-02-01 20 3
group2 2016-04-02 8 4
edit: sorry I realized my question was not specified properly. I actually want a window that ends after 30 days of inactivity. I have modified the group2 portion of the example accordingly.
See Question&Answers more detail:os