Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a dataset that is around 190GB that was partitioned into 1000 partitions.

my EMR cluster allows a maximum of 10 r5a.2xlarge TASK nodes and 2 CORE nodes. Each node having 64GB mem and 128GB EBS storage.

In my spark job execution, I have set it to use executor-cores 5, driver cores 5,executor-memory 40g, driver-memory 50g, spark.yarn.executor.memoryOverhead=10g, spark.sql.shuffle.partitions=500, spark.dynamicAllocation.enabled=true

But my job keeps failing with errors like

spark.shuffle.MetadataFetchFailedException
spark.shuffle.FetchFailedException
java.io.IOException: No space left on device
Container Lost
etc...

A lot of the answers to these kinds of issues that I found online say to increase the memoryOverhead. Which i did, from 2G to 10G. My total executor memory and memoryOverhead is 50G. with 40G allocated to executor and 10G allocated to overhead. But I think I am reaching the limit since I won't be able to go above 56.

I thought i did all that was possible to optmize my spark job:

  1. Increase partitions
  2. Increase spark.sql.shuffle.partitions
  3. Increase executor and overhead memory

But my job still fails. Is there anything else I can try? Should i increase my overhead even more so that my executor memory/overhead memory is 50/50? The memory profile of my job from ganglia looks something like this:

(The steep drop is when the cluster flushed all the executor nodes due to them being dead)

enter image description here

Any insight would be greatly appreciated

Thank You

EDIT:[SOLUTION]

I am appending to my post with the exact solution that solved my problem thanks to Debuggerrr based on his suggestions in his answer.

  • I had a large data frame that I was re-using after doing many computations on other dataframes. By using the persist() method (suggested by Debuggerrr), I was able to save that to MEMORY and DISC and simply call it back without parts of it being cleaned up by the GC.
  • I also followed the best practices blog Debuggerrr mentioned in his answer and calculated the correct executor memory, number of executors etc. But what I failed to do was disable spark.dynamicAllocation.enabled. The blog states that it is best to set the property to false if we are calculating the resources manually since spark tends to misallocate resources if your calculation doesnt line up with it. Once I set it to false, and set the correct executor and spark attributes, it worked like a charm!

[EDIT 2]: The parameters that specifically worked for my job are:

--executor-cores 5 --driver-cores 5 --executor-memory 44g --driver-memory 44g --num-executors 9 --conf spark.default.parallelism=100 --conf spark.sql.shuffle.partitions=300 --conf spark.yarn.executor.memoryOverhead=11g --conf spark.shuffle.io.retryWait=180s --conf spark.network.timeout=800s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.dynamicAllocation.enabled=false
question from:https://stackoverflow.com/questions/65866586/optimizing-spark-resources-to-avoid-memory-and-space-usage

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
928 views
Welcome To Ask or Share your Answers For Others

1 Answer

You can try either of the below steps:

  1. Memory overhead should be 10% of the Executor memory or 328 MB. Don't increase it to any value.
  2. Remove Driver Cores.
  3. If you have 10 nodes, then specify the number of executors. You have to calculate it in such a way that you leave some space for YARN and background processes. Also, you can try increasing 1 or 2 more cores.
  4. Run it on a cluster mode and whatever number you assign to executors, add +1 to it since 1 executor will be treated as driver executor in the cluster mode.
  5. Also, the last thing is nothing but your code written to submit / process that 190GB of file. Go through your code and find ways of optimizing it. Look for collect methods, or unnecessary use of joins, coalesce / repartition. Find some alternatives to it if it isn't needed.
  6. Use persist(Memory and Disk only) option for the data frames that you are using frequently in the code.
  7. Also the last thing which I tried is to execute the steps manually on the spark-shell on EMR and you will come to know which part of the code is taking much time to run.

You can also refer to this official blog for some of the tips.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...