Few key points to remember while doing building spark applications to optimize performance
- Spark UI (Monitor and Inspect Jobs).
- Level of Parallelism (Clusters will not be fully utilized unless the level of parallelism for each operation is high enough. Spark automatically sets the number of partitions of an input file according to its size and for distributed shuffles, such as groupByKey and reduceByKey, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument to an operation. In general, 2–3 tasks per CPU core in your cluster are recommended. That said, having tasks that are too small is also not advisable as there is some over head paid to schedule and run a task.As a rule of thumb tasks should take at least 100 ms to execute).
- Reduce working set size (Operations like groupByKey can fail terribly when their working set is huge. Best way to deal with this will be to change the level of parallelism)
- Avoid groupByKey for associative operations(use operations that can combine)
- Multiple Disk (give spark multiple disks for intermediate persistence. This done via setting in ResourceManager)
- Degree of Parallelism (~ 2 to 3 time the number of cores on Worker nodes)
- Performance due to chosen Language (Scala > Java >> Python > R)
- Higher level APIs are better (Use Dataframe for core processing, MLlibfor Machine Learning, SparkSQL for Queryand GraphXfor Graphprocessing)
- Avoid collecting large RDDs (use take or takeSample).
- Use Dataframe (This is more efficient and uses Catalyst optimizer.)
- Use Scope as provided in maven to avoid packaging all the dependencies
- Filter First, Shuffle next
- Cache after hard work
- Spark Streaming — enable back pressure (This will tell kafka to slow down rate of sending messages if the processing time is coming more than batch interval and scheduling delay is increasing)
- If using Kafka, choose Direct Kafka approach
- Extend Catalyst Optimizer’s code to add/modify rules
- Improve Shuffle Performance:
a. Enable LZF or SnappyCompression (for shuffle)
b.Enable Kryo Serialization
c. Keep shuffle data small(using reduceByKey or filter before shuffle)
d. No Shuffle block canbe greater than2GB in size. Else exception:size is greater than Interger.MAX_SIZE. Spark uses ByteBuffer for ShuffleBlocks. ByteBuffer is limitedby Integer.MAX_SIZE = 2 GB. Ideally, each partition should have roughly128 MB.
e. Think about partition/ bucketing ahead of time.
f. Do as much as possible with a single shuffle - Use cogroup (instead of rdd.flatmap.join.groupby)
- Spend time of reading RDD lineage graph (handy way is to read RDD.toDebugString() )
- Optimize Join Performance a. Use Salting to avoid SkewKeys. Skew sets are the ones where data is not distributed evenly. One for Few partitions have huge amount of Data in comparison to other partitions.
a. Here change the (regular key) to (concatenate (regular key, “:”, random number)).
b.Once this is done, then first do join operation on salted keys and then do the operation on unsalted keys b. Use partitionBy(new hash partition()) - Use Caching (Instead of MEM_ONLY, use MEM_ONLY_SER. This has better GC for larger datasets)
- Always cache after repartition.
- A Map after partitionBy will lose the partition information. Use mapValue instead
- Speculative Execution (Enable Speculative execution to tackle stragglers)
- Coalesce or repartition to avoid massive partitions (smaller partitions work better)
- Use Broadcast variables
- Use Kryo Serialization (more compact and faster than Java Serialization. Kryo is only supported in RDD caching and shuffling– not in Serialize To disk operations like SaveAsObjectFile)
I guess, Java and Scala would equal performance. So it should be something like "Java = Scala >> Python > R". Can you share more on how you came to this conclusion that "Scala > Java" ?
ReplyDelete