Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

def trainBestSeller(events: RDD[BuyEvent], n: Int, itemStringIntMap: BiMap[String, Int]): Map[String, Array[(Int, Int)]] = { val itemTemp = events // map item from string to integer index .flatMap { case BuyEvent(user, item, category, count) if itemStringIntMap.contains(item) => Some((itemStringIntMap(item),category),count) case _ => None } // cache to use for next times .cache()

    // top view with each category:
    val bestSeller_Category: Map[String, Array[(Int, Int)]] = itemTemp.reduceByKey(_ + _)
                                            .map(row => (row._1._2, (row._1._1, row._2)))
                                            .groupByKey
                                            .map { case (c, itemCounts) =>
                                              (c, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
                                            }
                                            .collectAsMap.toMap




    // top view with all category => cateogory ALL
    val bestSeller_All: Map[String, Array[(Int, Int)]] = itemTemp.reduceByKey(_ + _)
    .map(row => ("ALL", (row._1._1, row._2)))
    .groupByKey
    .map { 
        case (c, itemCounts) =>
            (c, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
    }
    .collectAsMap.toMap


    // merge 2 map bestSeller_All and bestSeller_Category
    val bestSeller = bestSeller_Category ++ bestSeller_All
    bestSeller
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
168 views
Welcome To Ask or Share your Answers For Others

1 Answer

List processing

Your list processing seems okay. I did a small recheck

def main( args: Array[String] ) : Unit = {

  case class JString(x: Int)
  case class CompactBuffer(x: Int, y: Int)

  val l = List( JString(2435), JString(3464))
  val tuple: (List[JString], CompactBuffer) = ( List( JString(2435), JString(3464)), CompactBuffer(1,4) )

  val result: List[(JString, CompactBuffer)] = tuple._1.map((_, tuple._2))
  val result2: List[(JString, CompactBuffer)] = {
    val l = tuple._1
    val cb = tuple._2
    l.map( x => (x,cb) )
  }

  println(result)
  println(result2)
}

Result is (as expected)

List((JString(2435),CompactBuffer(1,4)), (JString(3464),CompactBuffer(1,4)))

Further analysis

Analysis is required, if that does not solve your problem:

  • Where are types JStream (from org.json4s.JsonAST ?) and CompactBuffer ( Spark I suppose ) from?
  • How exactly looks the code, that creates pair ? What exactly are you doing? Please provide code excerpts!

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...