Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I'm stuck in a process where I need to perform some action for each column value in my Dataframe which requires traversing through the DF again. Following is a data sample:

Row(user_id='KxGeqg5ccByhaZfQRI4Nnw', gender='male', business_id='S75Lf-Q3bCCckQ3w7mSN2g', friends='my4q3Sy6Ei45V58N2l8VGw, R3zl9VKw63rPxSfBxbasWw, c-c64rURhBR8V8scSbwo7Q, tn6qogrDbb9hEKfRBGUUpw, pu_AQig2fw40PshvtgONPQ, IDrgtQccPN9c4rBn7yyk4Q, OIIx11vTeLN8EBcZrYXHKQ')

friends is here is just a list of other user_id. What I'm trying to do is fetch some value for each of this friends for this specific user. Now, since this is user_id I'd need to query my DF for this, which isn't allowed in UDF. I'm neither able to perform spark.sql nor refer a Dataframe and perform a filter since both are sparkSession objects.

What different approach can I try here?

Trying by creating a DF and then filtering:

tempDF=sparkSession.sql("SELECT review_sentiment,user_id,business_id FROM events")

def getfriendsSentiment(friendsList, b_id):

  listOfSentiments=[]
  for friend_id in friendsList.split(','):
    try:
      listOfSentiments.append(tempDF.filter("user_id='"+friend_id+"' AND business_id='"+b_id+"'").rdd.flatMap(lambda x:x).collect()[0])
    except:
      pass

friendsSentiment = udf(getfriendsSentiment, StringType())
businessReviewUserDfWithFriends=businessReviewUserDfWithFriends.withColumn('friendsSentimentToBusiness', friendsSentiment('friends','business_id'))

Error:

py4j.Py4JException: Method __getstate__([]) does not exist

Trying by creating a Table and querying it:

sparkSession.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")

def getfriendsSentiment(friendsList, b_id):

  listOfSentiments=[]
  for friend_id in friendsList.split(','):
    try:
       listOfSentiments.append(spark.sql("SELECT review_sentiment FROM events WHERE user_id='"+friend_id+"' AND business_id='"+b_id+"' GROUP BY review_sentiment ORDER BY COUNT(review_sentiment) DESC LIMIT 1").rdd.flatMap(lambda x:x).collect()[0])
    except:
      pass

Error:

PicklingError: Could not serialize object: Exception: It appears that you are attempting........

What can I do to get around this?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
157 views
Welcome To Ask or Share your Answers For Others

1 Answer

You are not allowed to use SparkSession/DataFrame objects in UDFS.

The solution I think that will work here is to explode every row by friends then do join (friend.id==user.id&&friend.business_id==user.business_id).

Second solution is possible (if the events table will fit into your memory), is to collect your event table at the start, and then broadcast it to all executors. Then, you can use your data in the UDF. It can be done only if the events is a small table and fits into your memory.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...