Sorry, you need to enable JavaScript to visit this website.

Introduction of S3A with Ceph* for Big Data Workloads

BY Hualong Feng, Vivian Zhu ON Mar 04, 2021

Introduction

In today's world, data is the king. The big data processing platforms Spark* and Hadoop* rely on the HDFS distributed file system. In the early stage of data accumulation, we may use centralized storage solutions to store data, such as Ceph. This can better manage data and fit for data expansion. However, when we use Spark or Hadoop to avoid the movement of data, we’d like to access the centralized storage directly. This article uses Ceph as a centralized storage solution for big data processing platforms, such as Spark. Ceph, a leading open-source software-defined storage application, provides excellent scalability and centrally-managed clusters with file, object, and block interfaces supported. It can be accessed by multiple applications through S3 HTTP Restful APIs via RGW.S3A, an emerging standard, is the adaptation layer to convert S3 to HDFS APIs. S3A can accommodate big data to seamlessly interface with a Ceph storage solution.

This article illustrates the method to use Ceph as the backend storage for Spark-engine big data workloads. By the end of this article, you will be familiar with how to use S3A in existing big data platforms and the basic implementation of S3A to integrate with Ceph storage.

Architecture and Commit Protocol

Now, let's take a look at the position of S3A in the big data computing platform and its implementation.

Figure 1. Setup Architecture

Figure 1 illustrates the setup architecture used in this article:

  • The Hadoop MapReduce engine generates the data used in the test
  • Spark is the computing engine that executes the actual workload
  • Hadoop Yarn is responsible for resource management and job scheduling and monitoring
  • Hadoop HDFS is used in the commit phase
  • Ceph stores the source data and the produced data

Figure 2. Hadoop Storage Connectors

Spark interacts with Ceph through Hadoop, mainly through the Hadoop FileSystem Interface. Ceph provides an object storage interface called RGW, which is accessed through Restful HTTP. The Hadoop S3A connector acts as a conversion layer to convert restful HTTP requests to access the Hadoop FileSystem Interface. In the evolution of the conversion layer, the connector has changed from the original S3N (S3 Native, some S3N code remains, but it is no longer available) to the current S3A connector.

A task writes output to storage through the Hadoop OutputCommitter, which consists of a single job committer and one or more task committers. To prevent individual tasks from failing or running too slowly, multiple tasks may run the same work. To have only one task attempt to be committed and to ensure the consistent termination of a transaction in a distributed environment, each connector needs to implement the corresponding function in the commit protocol as shown in Figure 3, which illustrates the sequence diagram of the Spark commit protocol.

Figure 3. Spark Commit Protocol

 

Figure 4. Spark Architecture

Before introducing the commit protocol, let’s look at the Spark architecture. As shown in Figure 4, there is one driver node and multiple distributed worker nodes. Each Worker node consists of one or more Executor(s) that is responsible for running the Task. Executors register themselves within the Driver. The Driver continuously retains all information about the Executors.

Now, we use the DirectoryStagingCommitter as an example to introduce the commit protocol shown in Figure 3. When it initializes the S3A filesystem (which is not a real filesystem but is a distributed object store masquerading as a filesystem), it needs to confirm the existence of the bucket.

Method

Example Parameters

HEAD

/benchmarks/ 

In Job Setup, the cluster-filesystem committer, wrappedCommitter, is created and initialized. The cluster-filesystem used here is the Hadoop Distributed File System. We need to get the file list under the specific directory. There are some HTTP requests involved or completed.

Method

Example Parameters

HEAD

/benchmarks/HiBench/Terasort/Input 

HEAD

/benchmarks/HiBench/Terasort/Input/ 

GET

/benchmarks/? delimiter=/&max-keys=1&prefix=HiBench/Terasor/Input/

GET

/benchmarks/? delimiter=/&max-keys=5000&prefix=HiBench/Terasort/Input/

In Task Setup, a unique path in the local filesystem is required. Each task writes its data into the local filesystem of the server where the task is executed and wrappedCommitter.setupTask() is invoked.

Method

Example Parameters

HEAD

/benchmarks/HiBench/Terasort/Input/part-m-00006-job_1600929687869_0017 

GET

/benchmarks/HiBench/Terasort/Input/part-m-00006-job_1600929687869_0017 

In Needs Task Commit, only the result data that has been generated in the local filesystem can commit.

In Task Abort, all staged data is removed and the wrappedCommitter is aborted.

In Task Commit, if the task is given permission to commit its output, it initiates multipart uploads of all files under the local filesystem. The data is uploaded to S3, but the multipart operation is not completed. The wrappedCommitter.commitTask() is invoked.

Method

Example Parameters

POST

/benchmarks/HiBench/Terasort/Output/part-r-00014-job_20200924181548_0000?uploads 

PUT

/benchmarks/HiBench/Terasort/Output/part-r-00014-job_20200924181548_0000?uploadId=2%7EHWJplA89f0UUieQS4RCcElk6af-_OYM&partNumber=1 

In Job Commit, it processes manifests for the pending uploads and completes the multipart uploads. After that, the object can be viewed.

Method

Example Parameters

POST

/benchmarks/HiBench/Terasort/Output/part-r-00014-job_20200924181548_0000?uploadId=2%7EHWJplA89f0UUieQS4RCcElk6af-_OYM 

To clean up a job in Job Cleanup, all incomplete uploads must be aborted and the files and unique paths in the local filesystem must be removed. wrappedCommitter.cleanupTask() is invoked.

Method

Example Parameters

POST

/benchmarks/?delete 

 

 

 

How to use

This section shows a case of how to use Spark as the computing engine, Yarn as the resource management platform, and Ceph as the storage backend.

Software configuration

Ubuntu*

18.04.5 LTS

hadoop

3.3.0

spark

spark-3.0.1-bin-hadoop3.2

Java*

openjdk 11.0.9

scala

2.12.12

python3*

3.6.9

Ceph

15.2.4

Deployment diagram

Figure 5. Deployment diagram

Due to hardware limitations, for this deployment we use three servers for Ceph OSDs and two servers for WorkerNode and DataNode. The secondary NameNode is usually run on a different machine rather than the primary NameNode. We’ll use an idle server to run as the SecondaryNode.

Hardware information

Hardware

Models

Description

Disk

P4610

Used for S3A cache disk and Ceph Storage disks.

Network card

40Gbps

Public network, Cluster network of Ceph, the network of Spark and HDFS

CPU

Platinum 8180M

Same as client and Storage nodes on CPU(CPU(s): 112).

How to install

For the installation and deployment of Ceph refer to https://docs.ceph.com/en/latest/install/

Create Ceph RGW S3 user:

$ radosgw-admin user create --uid="spark" --display-name="spark" --secret-key=ceph2020 --access-key=ceph2020

Install java:

$ sudo apt install -y openjdk-11-jre-headless

$ sudo apt install -y openjdk-11-jdk-headless

Install scala:

$ wget https://downloads.lightbend.com/scala/2.12.12/scala-2.12.12.deb

$ sudo dpkg -i scala-2.12.12.deb

Download the Spark and Hadoop installation package

Hadoop: https://hadoop.apache.org/releases.html

Spark: https://spark.apache.org/downloads.html

S3A configuration

Assume that the Spark environment using HDFS as the backend storage has been configured according to the deployment diagram. The following section describes how to add the S3A configuration.

Since Spark does not have a jar package related to S3A by default, we need to download the same jars of the current Spark version first. The hadoop-aws-3.2.0 version is used here. And put it into the ${SPARK_HOME}/jars directory.

$ wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar

$ wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar

 

At the same time, you need to upload the newly downloaded jars to the directory specified by spark.yarn.archive or spark.yarn.jars in order to make Spark runtime jars accessible from YARN side.

To have Spark access ceph, you need to add the following corresponding fields in the ${HADOOP_DIR}/etc/hadoop/core-site.xml file. For simplicity, only the corresponding name and values ​​are listed below.

fs.s3a.access.key=ceph2020

fs.s3a.secret.key=ceph2020

fs.s3a.endpoint=192.168.0.8:8000

fs.s3a.connection.ssl.enabled=false

fs.s3a.path.style.access=true

fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

 

The S3A OutputCommitter has four implementation methods, as follows:

fs.s3a.committer.name

Committer

Directory

directory staging committer

Partitioned

partition staging committer (for use in Spark only)

Magic

the "magic" committer

File

the original and unsafe File committer; (default)

 

Workloads that store on HDFS may run multiple times for the same job to avoid other situations such as a job that runs too slowly or makes errors. Using atomic rename operations can avoid multiple commits of the same result. As with the nature of object storage, there is no rename API and the default S3A committer's rename implementation involves copy and delete operations, which results in write amplification and performance degradation.

Object storage is naturally atomic. Two new staging committers that were developed by Netflix are directory and partitioned. They can fully use native object storage operations without renaming operations. The magic committer is not mature yet, so it won’t be discussed here.

Here we use the directory for OutputCommitter. The relevant parameter configuration is shown below:

fs.s3a.committer.name=directory

fs.s3a.committer.magic.enabled=false

fs.s3a.committer.staging.tmp.path=/tmp/staging

fs.s3a.committer.staging.abort.pending.uploads=true

fs.s3a.committer.staging.conflict-mode=append

fs.s3a.committer.staging.unique-filenames=true

fs.s3a.buffer.dir=${hadoop.tmp.dir}/s3a  #need a high-performance disk

fs.s3a.fast.upload.buffer=disk

fs.s3a.fast.upload=true

fs.s3a.fast.upload.active.blocks=256

fs.s3a.block.size=128M

fs.s3a.max.total.tasks=512

fs.s3a.multipart.size=512M

fs.s3a.multipart.threshold=2147483647

fs.s3a.committer.threads=256

Other relevant parameters are listed below:

fs.s3a.threads.max=512

fs.s3a.connection.maximum=1024

fs.s3a.connection.establish.timeout=5000

fs.s3a.connection.timeout=200000

fs.s3a.socket.recv.buffer=65536

fs.s3a.socket.send.buffer=65536

fs.s3a.experimental.input.fadvise=normal

fs.s3a.readahead.range=64K

Test application

Here we select Hibench as the Spark application for testing and select Terasort, Kmeans, and Wordcount as the test baselines. Before testing, we need to create a bucket in Ceph. Here we create a bucket named benchmarks.

Get Hibench

$ git clone https://github.com/Intel-bigdata/HiBench.git

 

Compile

$ sudo apt install maven

$ mvn -Dspark=3.0.1 -Dscala=2.12.12 clean package #Note: remeber to adapt the versions

Note: To adapt to the current version, some changes have been made in Hibench. Please refer to the GitHub link how to compile to adapt the versions: https://github.com/Intel-bigdata/HiBench/blob/master/docs/build-hibench.md

 

Change configuration options:

# in ${Hibench}/conf/spark.conf

Hibench.spark.home                ${SPARK_HOME}


hibench.yarn.executor.num         20

hibench.yarn.executor.cores       4


spark.executor.memory             16g

spark.driver.memory               100g


# in ${Hibench}/conf/hadoop.conf

hibench.hadoop.home                ${HADOOP_HOME}


hibench.hdfs.master                s3a://benchmarks/

hibench.hadoop.release             apache


# in ${Hibench}/conf/hibench.conf

Hibench.scale.profile               gigantic

Hibench.default.map.parallelism     80

Hibench.default.shuffle.parallelism  80

 

Test Results:

Figure 6. Influence of the number of input and output files

From the perspective of changing both the mapper number and the reducer number, when the reducer number is close to the number of all the cores in all the executors, the performance is the best. Therefore, the next step of tuning can be biased towards the reducer number.

Note:  The mapper number and reducer number are derived from the Hibench configuration parameters. Here we can simply treat mapper number as the number of input files and the reducer number as the number of output files.

Since S3 supports Get range requests, the number of input files does not affect the overall performance. The number of output files affects the concurrent writing of the results to Ceph.

Figure 7. Influence of the number of CPUs

Under the condition that the total number of cores and the size of the memory remain unchanged (we kept it to 4GB per core), it is best to keep the core number of each executor between two and four, as shown on Figure 7.

Figure 8. Influence of different fadvise policy

An analysis of the test results shows that different applications are appropriate to different policies. But we recommend you use normal policy for input. The normal policy starts reading a file in sequential mode, but if the caller seeks backwards in the stream, it switches from sequential to random.

Summary

In this article, we explained how to use S3A to access and store Spark-engined data on Ceph through Ceph RGW interface. We illustrated some of the Spark architecture and described Spark commit protocol in detail to explain the implementation of S3A. Then we provided steps to conduct performance testing. The result shows that cores per executor and policy selection impacts application workloads. By S3A, with the diversity of workload and data intelligent needs, it can meet the requirement to have Spark directly access Ceph to avoid maintaining separate HDFS clusters. As data expands, the scale-out capability of Ceph can show more benefit compared to HDFS.

Reference