Optimizing performance of Apache Spark workloads on Amazon S3

by Aritra Gupta, Alan Halcyon, and Jason Chen | on

This blog covers performance metrics, optimizations, and configuration tuning specific to OSS Spark running on Amazon EKS . For customers using or considering Amazon EMR on EKS, refer to the service documentation to get started and this blog post for the latest performance benchmark.


Performance is top of mind for customers running streaming, extract transform load (ETL), machine learning (ML) or graph processing workloads on Apache Spark and Kubernetes. Although customers typically start off with the default settings of frameworks like Apache Spark and Kubernetes, their next step is to tune these workloads. This is an important step as each workload is unique, and requires performance tuning to use allocated resources efficiently and avoid bottlenecks.

To run Apache Spark jobs on Kubernetes, customers commonly use Amazon Elastic Kubernetes Service (EKS) , with data stored on Amazon Simple Storage Service (S3) . Amazon EKS is a fully managed service that makes it easy to run Kubernetes on Amazon Web Services without needing to install, operate, and maintain a Kubernetes control plane. For data lake storage, customers typically use Amazon S3 because it’s secure, scalable, durable, and highly available.

In this post, we outline the step-by-step process to optimize Apache Spark jobs on Amazon EKS and Amazon S3. We begin by setting up a data lake with sample data and go through the tuning steps in detail. By applying these changes, the runtime of the Apache Spark job reduced by 60% and CPU utilization increased by 30%, thereby reducing the overall cost of running the job. Although the optimizations are tailored for an Apache Spark job running on Amazon EKS, they also work with self-managed Kubernetes on Amazon Web Services, Amazon Web Services Fargate , and Apache Spark clusters using S3A or other connectors for Amazon S3.

Initial setup and observations

To emulate a typical customer workload, we setup a 130-GB data lake on Amazon S3 consisting of parquet files with an average size of 13 GB. We read the parquet files from Amazon S3, select a few columns, and then save the selected columns back to Amazon S3 into a destination folder. For running the Apache Spark job on Amazon EKS, we used an m5.4xL machine with 16 cores, 64-GB RAM, and maximum network throughput of 10 GBit/s. We used Apache Spark 3.4 and Hadoop 3.3.1 versions for our workload.

With default parameters, the job took 10 minutes to complete, with an average CPU utilization of 50%. We observed the CPU utilization was low, and were looking for ways to improve it, bringing the job completion time down in the process. A quick analysis of the Apache Spark event logs indicated that about half of the time was spent reading data from Amazon S3. Therefore, we looked at ways to optimize the time to read data from Amazon S3.

Steps to reduce data read time from Amazon S3

Our improvements focused on the following areas:

  • Adjust the data byte ranges read
  • Optimize Kubernetes Pod resources
  • Modify the Kubernetes DNS configuration

Next, we go through each of these optimizations.

1/ Adjust the data byte ranges read

First, we looked at the number of bytes read for each request. This is crucial for optimizing the network utilization, as data fetched in smaller byte ranges typically results in Apache Spark jobs spending more CPU cycles managing the network connections instead of processing the data. Although we looked at a wide range of byte reads, throughout our experiment we wanted to strike a balance between data read sizes and the number of jobs running in parallel. This is important because at large sequential file reads (1 GB+), the number of partitions (individual segments of data to be queried upon) reduced significantly (>75%). This reduced the total throughput of the job.

To find the right size of Amazon S3 reads, we experimented with the following four parameters:

a/ Adjust parquet.block.size: A Parquet file consists of one or more Row Groups. A Row Group consists of one data chunk for every column following each other, and every data chunk consists of one or more Pages with the column data. Larger row groups allow for larger column chunks, which makes it possible to do larger sequential I/O. Larger groups also require more buffering in the write path. Conversely, a small group incurs lower sequential I/O and hurts throughput. We experimented with a larger parquet row group size while writing the parquet file for better network utilization in reading the data.

df.write.option("parquet.block.size", 128*1024*1024).parquet('s3://data/file/parquet')

b/ Adjust the size of the input files to make sure the block size increase is effective, as a very low input file size can be a limiting factor. For example, setting a block size of 128 MiB, while the input file size is only 64 MiB, means that an effective block size is only 64 MiB, as an individual Amazon S3 request cannot exceed 64 MiB. Therefore, the input file size must be at least larger than the block size. This can be achieved by reducing the number of partitions before saving the input files.

df.repartition(100).write.option("parquet.block.size", 128*1024*1024).parquet('s3://data/file/parquet')

c/ Adjust spark.hadoop.parquet.read.allocation.size: This parameter specifies the buffer size used in reading parquet files. We observed that it also limits the number of bytes transferred in each Amazon S3 request while using S3A to read from Amazon S3. The default value is set at 8 MB, which limits each Amazon S3 request to 8 MB. To increase throughput and enable larger sequential I/O reads, we experimented with higher values and ended up with 128 MB for the most optimized performance. Increasing the parameter beyond 128MB reduced the total number of partitions in the job, thereby increasing the overall job runtime.

d/ Adjust spark.sql.files.maxPartitionBytes: This parameter specifies the overall data sizes read into a partition, and can be a limiting factor if it is configured to be lower than spark.hadoop.parquet.read.allocation.size. To enable large sequential, I/O read with faster throughput, we ended up with 512 MB for most optimized performance.

We experimented with different combinations of the previously mentioned parameters, and ended up with the following values for our workload:

  • parquet.block.size = 512 MB
  • Input file size = 13 GB
  • parquet.read.allocation.size = 128 MB
  • maxPartitionBytes = 512 MB

With this tuning, the average throughput increased by 77% and the job time reduced by 40% to 6 mins from 10 mins. The overall CPU utilization also increased from 50% to 65%.

Next, we look at allocating the right number of CPU cores for our Apache Spark job.

2/ Optimize Kubernetes Pod resources

Getting to the right number of CPU requests on Kubernetes takes some experimentation. When we observe CPU utilization to be lower than allocation, we consider resizing the container down, as computing power is being wasted. However, if the Pod CPU limits are too restrictive, workloads can observe CPU throttling. This means the jobs take longer to complete. Therefore, we adjust the number of vCPU cores allocated to improve performance or cut cost. To achieve this, we reviewed the performance metrics of our Apache Spark job to make sure that the vCPUs requested most accurately reflect the actual resource utilization we observed from our applications. This allowed Kubernetes to pack more replicas of the application onto a node, increasing the resource utilization per node and reducing the overall node count.

Here are the steps we followed to identify the right number of vCPUs for our data lake:

Step 1: Identify the resources that the application is currently using. This can be done by measuring the average CPU usage, CPU throttling, and the maximum memory usage under normal load. The kubernetes-mixin dashboards installed with the kube-prometheus-stack can display the usage per pod, and the usage on the node. You can also refer to this Amazon Web Services post on using Prometheus for more details on CPU throttling and related metrics.

Step 2: Find the maximum performance of the application without limits defined. This gives us the usage during the “best case scenario” for the pod, where it gets all the resources it needs. To achieve this, we removed the limits in the pod specification, and re-ran our Apache Spark executors on empty Kubernetes worker nodes.

Step 3: Calculate the average CPU usage in CPU units and the maximum memory usage at peak load. These values are what we can expect from a single Executor pod driven to the extreme required to run.

Step 4: Based on the calculations in Step 3, we configured the following:

  • vCPU Requests to the average CPU usage.
  • Memory request and Limit to the maximum memory usage, as well as a buffer to avoid running out of memory.

We did not set a vCPU limit, and we ran our Apache Spark job again to verify the resource usage at peak load.

Through this process we adjusted the vCPU Requests from 4 cores for each pod to 0.75 cores and removed the vCPU limit. This request efficiently reduced CPU idle time by sharing CPU and increasing utilization to 75%. Thereby we reduced the job run time by 16% from 6 to 5 minutes. To further improve the overall throughput, we also looked at the DNS configuration of our Apache Spark job.

3/ Modify the Kubernetes DNS configuration

By analyzing the network traffic, we identified that 35+% of all packets were DNS lookups to non-existent hostnames, which contributed to network delays. Each Amazon S3 GET request resulted in five DNS lookups that had to wait for each to conclude as a failed lookup. These additional DNS lookups added wait time to the Apache Spark job. The following is a list of all non-existent URLs, plus the real address in the last row.

amazon.com.default.svc.cluster.local
 amazon.com.svc.cluster.local
 amazon.com.cluster.local
 amazon.com.ec2.internal
 amazon.com.

Kubernetes creates DNS records for Services and Pods. By default, Kubernetes is configured without DNS caching and with the number of dots, referred to as ndots, set to five. The ndots value sets a threshold for the number of dots that must appear in a name before an initial absolute query is made. We tuned the ndots value configured in Kubernetes to optimize for faster and more efficient DNS resolution.

nameserver 10.100.0.10
 search default.svc.cluster.local svc.cluster.local cluster.local ec2.internal
 options ndots:5

For our workload, Kubernetes was doing the service lookup to Amazon S3, an external service. Therefore, we wanted Apache Spark to lookup only valid Amazon S3 DNS names. We found that reducing the ndots value to two resulted in Apache Spark doing a single DNS lookup, instead of a series of failed requests. We configured the ndots value in the dnsConfig of the Kubernetes deployment:

 spec:
   dnsConfig:
     options:
       - name: ndots
         value: "2"

To verify the effect of ndots settings before and after our changes, we looked at the logs in the coreDNS pods. First, we turned on the verbose log settings in the coreDNS deployment. We edited the configmap of coreDNS using the following command:

kubectl edit configmap coredns -n kube-system

We added a line with log into the configmap and saved the edits. Here’s a sample of the log output we observed:

apiVersion: v1
 data:
   Corefile: |
     .:53 {
         log
         errors
         health
         kubernetes cluster.local in-addr.arpa ip6.arpa {
           pods insecure
           fallthrough in-addr.arpa ip6.arpa
         }
 ...............

Then, we checked the DNS logs using this command: kubectl logs coreDNS_pod_name – n kube-system.

For our sample workload, setting the ndots value to two increased the average throughput by 5%. Although the improvement is marginal in our experiment, customers have seen up to 30% throughput improvements through this configuration. This final optimization lowered our job run time to five minutes, CPU utilization is at 75%, and the overall throughput increased by 82%.

Monitor and verify changes to the workload

It is essential to verify the change of each parameter that you fine-tune in your workload. This is important, as the same settings may impact each workload differently due to the exact environment of the workload. Here is our recommended list of ways to monitor and verify changes in your Apache Spark job:

  • Check the job running time and data scan time in the Apache Spark event logs, which are available from the Apache Spark Web UI.
  • Monitor network usage, and the CPU utilization of cluster and error rate using Amazon Web Services or Open Source Observability tools .
  • Enable Amazon S3 access logs and check the request size and throughput. For more details, you can refer to the Amazon S3 User Guide .

Conclusion

In this post, we showcased three techniques to optimize Amazon S3 performance for Apache Spark workloads on Amazon EKS. We adjusted the data byte ranges read, optimized Kubernetes Pod resources and modified the Kubernetes DNS configuration. Through our performance tuning steps, we halved the job running time from 10 minutes to 5 minutes, increased CPU utilization from 50% to 75%, and increased the overall throughput by 82%. These optimizations helped reduce the time to run the Apache Spark job. Adopting these techniques can help generate insights from data quicker, and lower the total cost of the workload.

Let us know in the comments if these techniques helped you optimize your workloads.