Spark Optimization:
Spark Optimization is the most Important concepts. It is the responsibility of developer to design the spark application in a optimize manner so we can take full advantage of spark execution engine. When Application are not optimized, simple code takes longer to execute, resulting in performance lags and downtime, and it takes effect on the other Application which is using the same cluster.
There are two ways to optimize Spark Application.
1). Resource/Cluster level Optimization
2). Code Level / Application Level Optimization
If developer write the optimized code but if application
don’t have enough resources to run it then there is no use to write optimize
code, as it will take lot of time due to lack of resources.
Spark Architecture:
Before going to deep down into spark Resource Level
Optimization let’s first try to understand spark architecture, so it will be
easy to understand the process of Resource Level Optimization.
Apache Spark follows a master/slave architecture with two
main daemons and a cluster manager –
a).Master Daemon — (Master/Driver Process)
b).Worker Daemon –(Slave Process)
c).Cluster Manager
A spark cluster has Master Node and of Slaves/Workers Node.
The driver and the executors run their individual Java processes and users can
run them on the same horizontal spark cluster or on separate machines.
Master Node or driver
process : The master is the driver that runs the main() program where the
spark context is created. It then interacts with the cluster manager to
schedule the job execution and perform the tasks.
Slaves/Workers Node:
The worker consists of processes that can run in parallel to perform the tasks
scheduled by the driver program. These processes are called
Executors/Containers.
The role of worker nodes/executors:
1. Perform the data processing for the application code
2. Read from and write the data to the external sources
3. Store the computation results in memory, or disk.
Executors/Containers contains cores and RAM.
Executors/Containers launch once in the beginning of Spark Application and then
they run for the entire lifetime of an application. All the cores and total memory
divides among the number of executors. One node can hold one or more then one
executor. The individual task in the given Spark job runs in the Spark
executors/Containers.
Executors/Containers are the
combination of memory(RAM) and Cores.
Number Of Cores decides
Number of Parallel Process in each executors. So let say if we
have 5 cores in one executor then max 5 parallel task can execute in executor.
Tasks: Task is the
smallest execution unit in Spark. A task in spark executes a series of
instructions. For eg. reading data, filtering and applying map() on data can be
combined into a task. Tasks are executed inside an executor. Based one HDFS
block size (128 mb) spark will create partitions, and assign one task per
partition. So if we have 1GB(1024 MB)data , spark will create
(128mb+128mb+128mb+128mb) 4 partitions.
Stage : Group of tasks
creates a Stage. Number of Stages depends on Data Shuffling(Narrow and wide
Transformation). Spark encounters a function that requires a shuffle it
creates a new stage. Transformation functions like reduceByKey(), Join() etc will trigger a shuffle and will result in a new
stage
Job : Group of Stages Creates a
Job.
How to Decide Cluster Configuration
Let’s Assume we have 10 node cluster.
16 CPU Cores Each Node
64 GB RAM Each Node
Now we can decide the no of
Executers based on no of CPU cores. Each Executors can hold Minimum 1 Core and
Max is Number of CPU cores available.
If we consider the minimum
value. i.e. one core per executor.
In that case we will have 16 Executor and 1
core per executor, and each executor will have 64Gb/16=4GB RAM.
so as we discussed earlier, number of cores
decided the parallel task, so in this case we can not execute more then one
task at a time.
If we create any broadcast or accumulator
variable then it will create copy on each executor.
so it is not good idea to
assign one core per executor. This Process is also called tiny executor.
Let’s Discuss 2nd approach.
where we can create 1 single
executor and assign all the 16 cores to one executor. so we will have one
executor,64 core and 64 gb RAM.
Again In This case we will
have 16 parallel process
It is observed and that if we execute more
then 5 parallel process or have more then 5 cores per executor then the HDFS
throughput suffers.
If executor holds very huge amount of memory
(64gb ) then garbage collection takes lot of time.
So this is also not a good
cluster configuration. This Process is also called as FAT executor.
So above tiny and fat executor
is not a good cluster Configuration.
So we should always have a
balanced approach , so can effectively use our cluster.
Let’s try to configure
optimized Cluster.
Resources
1. Total Number Of Nodes : 10
2. Cores Per Node : 16
3. RAM Per Node : 64 GB
So, from the above Resources
we need to give 1 core and 1 gb ram for other OS Activities.
So , Now we left with -
1. Total Number Of Nodes : 102. Cores Per Node :
153. RAM Per Node : 63 GB
As per the study ,5 cores per
Executors is the best and preferred Choice.
15 cores → 63 GB RAM → each machineSo here we can
have 15/5=3 executors per machine.Memory : 63/3=21 GB Per Executor
1 Node — → 3 Executors →5 Cores per Executors → 21 GB RAM Per Executor.
Out Of this 21GB RAM , some
of Will go as part of overhead memory(off heap).
overhead
memory= max(384 MB,7% Of Executor Memory)
i.e max(384 MB, 7% Of 21
GB)~1.5 GB
So the Remaining Memory Will
be 21–1.5 ~ 19 GB
SO the Calculation Will be -
10 Node Cluster
Each node has 3 Executor: 10*3= 30 Executor
So 30 Executor With Each Executor will Hold.
: 5 CPU Core and 19 GB Memory.
Now Out of These 30 Executors
1 executor will we given for Yarn Applications Manager.
So Now Our Final Cluster Configuration Will Look like below:
Number Of Nodes : 10Total Number Of Executors : 29Core Per Executor : 5RAM Per Executor :19 GB
No comments:
Post a Comment