Performance
Before moving on to the rest of the chapters, covering functional areas of Apache Spark and extensions, we will examine the area of performance. What issues and areas need to be considered? What might impact the Spark application performance, starting at the cluster level and finishing with actual Scala code? We don't want to just repeat, what the Spark website says, so take a look at this URL:http://spark.apache.org/docs/<version>/tuning.html
.
Here, <version>
relates to the version of Spark that you are using; that is, either the latest or something like 1.6.1
for a specific version. So, having looked at this page, we will briefly mention some of the topic areas. We will list some general points in this section without implying an order of importance.
The cluster structure
The size and structure of your big data cluster are going to affect performance. If you have a cloud-based cluster, your IO and latency will suffer, in comparison to an unshared hardware cluster. You will be sharing the underlying hardware, with multiple customers and the cluster hardware may be remote.There are some exceptions to this. The IBM cloud, for instance, offers dedicated bare metal high-performance cluster nodes, with an InfiniBand network connection, which can be rented on an hourly basis.
Additionally, the positioning of cluster components on servers may cause resource contention. For instance, think carefully about locating Hadoop NameNodes, Spark servers, Zookeeper, Flume, and Kafka servers in large clusters. With high workloads, you might consider segregating servers to individual systems. You might also consider using an Apache system such as Mesos thatprovides better distributions and assignment of resources to the individual processes.
Consider potential parallelism as well. The greater the number of workers in your Spark cluster for large Datasets, the greater the opportunity for parallelism. One rule of thumb is one worker per hyper-thread or virtual core respectively.
Hadoop Distributed File System
You might consider using an alternative to HDFS, depending upon your cluster requirements. For instance, IBM has the GPFS (General Purpose File System) for improved performance.
The reason why GPFS might be a better choice is that coming from the high-performance computing background, this filesystem has a full read-write capability, whereas HDFS is designed as a write once, read many filesystems. It offers an improvement in performance over HDFS because it runs at the kernel level as opposed to HDFS, which runs in a Java Virtual Machine (JVM) that in turn runs as an operating system process. It also integrates with Hadoop and the Spark cluster tools. IBM runs setups with several hundred petabytes using GPFS.
Another commercial alternative is the MapR file system that, besides performance improvements, supports mirroring, snapshots, and high availability.
Ceph is an open source alternative to a distributed, fault-tolerant, and self-healing filesystem for commodity hard drives like HDFS. It runs in the Linux kernel as well and addresses many of the performance issues that HDFS has. Other promising candidates in this space are Alluxio (formerly Tachyon), Quantcast, GlusterFS, and Lustre.
Finally, Cassandra is not a filesystem but a NoSQL key-value store and is tightly integrated with Apache Spark and is therefore traded as a valid and powerful alternative to HDFS--or even to any other distributed filesystem--especially as it supports predicate push-down using ApacheSparkSQL
and the Catalyst optimizer, which we will cover in the following chapters.
Data locality
The key for good data processing performance is avoidance of network transfers. This was very true a couple of years ago, but is less relevant for tasks with high demands on CPU and low I/O, but for low demand on CPU and high I/O demand data processing algorithms, this still holds.
Note
We can conclude from this, that HDFS is one of the best ways to achieve data locality, as chunks of files are distributed on the cluster nodes, in most of the cases, using hard drives directly attached to the server systems. This means that those chunks can be processed in parallel using the CPUs on the machines where individual data chunks are located in order to avoid network transfer.
Another way to achieve data locality is using ApacheSparkSQL
. Depending on the connector implementation, SparkSQL can make use of the data processing capabilities of the source engine. So, for example, when using MongoDB in conjunction with SparkSQL, parts of the SQL statement are preprocessed by MongoDB before data is sent upstream to Apache Spark.
Memory
In order to avoid OOM (Out of Memory) messages for the tasks on your Apache Spark cluster, please consider a number of questions for the tuning:
- Consider the level of physical memory available on your Spark worker nodes. Can it be increased? Check on the memory consumption of operating system processes during high workloads in order to get an idea of free memory. Make sure that the workers have enough memory.
- Consider data partitioning. Can you increase the number of partitions? As a rule of thumb, you should have at least as many partitions as you have available CPU cores on the cluster. Use the
repartition
function on the RDD API. - Can you modify the storage fraction and the memory used by the JVM for storage and caching of RDDs? Workers are competing for memory against data storage. Use the Storage page on the Apache Spark user interface to see if this fraction is set to an optimal value. Then update the following properties:
spark.memory.fraction
spark.memory.storageFraction
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size
In addition, the following two things can be done in order to improve performance:
- Consider using Parquet as a storage format, which is much more storage effective than CSV or JSON
- Consider using the DataFrame/Dataset API instead of the RDD API as it might resolve in more effective executions (more about this in the next three chapters)
Coding
Try to tune your code, to improve the Spark application performance. For instance, filter your application-based data early in your ETL cycle.One example is, when using raw HTML files, detag them and crop away unneeded parts at an early stage.Tune your degree of parallelism, try to find the resource-expensive parts of your code, and find alternatives.
Note
ETL is one of the first things you are doing in an analytics project. So you are grabbing data, from third-party systems, either by directly accessing relational or NoSQL databases or by reading exports in various file formats such as, CSV, TSV, JSON or even more exotic ones from local or remote filesystems or from a staging area in HDFS: after some inspections and sanity checks on the files an ETL process in Apache Spark basically reads in the files and creates RDDs or DataFrames/Datasets out of them.
They are transformed, so that they fit the downstream analytics application, running on top of Apache Spark or other applications and then stored back into filesystems as either JSON, CSV or PARQUET files, or even back to relational or NoSQL databases.
Note
Finally, I can recommend the following resource for any performance-related problems with Apache Spark: https://spark.apache.org/docs/latest/tuning.html.