Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I'm trying to learn to use DataFrames and DataSets more in addition to RDDs. For an RDD, I know I can do someRDD.reduceByKey((x,y) => x + y), but I don't see that function for Dataset. So I decided to write one.

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
  val result = mutable.HashMap.empty[(Long,Long),Int]
  val keys = mutable.HashSet.empty[(Long,Long)]
  y.keys.foreach(z => keys += z)
  x.keys.foreach(z => keys += z)
  for (elem <- keys) {
    val s1 = if(x.contains(elem)) x(elem) else 0
    val s2 = if(y.contains(elem)) y(elem) else 0
    result(elem) = s1 + s2
  }
  result
})

However, this returns everything to the driver. How would you write this to return a Dataset? Maybe mapPartition and do it there?

Note this compiles but does not run because it doesn't have encoders for Map yet

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
774 views
Welcome To Ask or Share your Answers For Others

1 Answer

I assume your goal is to translate this idiom to Datasets:

rdd.map(x => (x.someKey, x.someField))
   .reduceByKey(_ + _)

// => returning an RDD of (KeyType, FieldType)

Currently, the closest solution I have found with the Dataset API looks like this:

ds.map(x => (x.someKey, x.someField))          // [1]
  .groupByKey(_._1)                            
  .reduceGroups((a, b) => (a._1, a._2 + b._2))
  .map(_._2)                                   // [2]

// => returning a Dataset of (KeyType, FieldType)

// Comments:
// [1] As far as I can see, having a map before groupByKey is required
//     to end up with the proper type in reduceGroups. After all, we do
//     not want to reduce over the original type, but the FieldType.
// [2] required since reduceGroups converts back to Dataset[(K, V)]
//     not knowing that our V's are already key-value pairs.

Doesn't look very elegant and according to a quick benchmark it is also much less performant, so maybe we are missing something here...

Note: An alternative might be to use groupByKey(_.someKey) as a first step. The problem is that using groupByKey changes the type from a regular Dataset to a KeyValueGroupedDataset. The latter does not have a regular map function. Instead it offers an mapGroups, which does not seem very convenient because it wraps the values into an Iterator and performs a shuffle according to the docstring.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...