I'm trying to learn to use DataFrames and DataSets more in addition to RDDs. For an RDD, I know I can do someRDD.reduceByKey((x,y) => x + y)
, but I don't see that function for Dataset. So I decided to write one.
someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
val result = mutable.HashMap.empty[(Long,Long),Int]
val keys = mutable.HashSet.empty[(Long,Long)]
y.keys.foreach(z => keys += z)
x.keys.foreach(z => keys += z)
for (elem <- keys) {
val s1 = if(x.contains(elem)) x(elem) else 0
val s2 = if(y.contains(elem)) y(elem) else 0
result(elem) = s1 + s2
}
result
})
However, this returns everything to the driver. How would you write this to return a Dataset
? Maybe mapPartition and do it there?
Note this compiles but does not run because it doesn't have encoders for Map
yet