We use machine learning technology to do auto-translation. Click "English" on top navigation bar to check Chinese version.
Run secure processing jobs using PySpark in Amazon SageMaker Pipelines
In this post, we explain how to run PySpark processing jobs within a pipeline. This enables anyone that wants to train a model using Pipelines to also preprocess training data, postprocess inference data, or evaluate models using PySpark. This capability is especially relevant when you need to process large-scale data. In addition, we showcase how to optimize your PySpark steps using configurations and Spark UI logs.
Pipelines is an
When processing large-scale data, data scientists and ML engineers often use
In our example, we create a SageMaker pipeline running a single processing step. For more information about what other steps you can add to a pipeline, refer to
SageMaker Processing library
SageMaker Processing can run with specific
- Step name – The name to be used for your SageMaker pipeline step
- Step arguments – The arguments for your
ProcessingStep
Additionally, you can provide the following:
- The configuration for your step cache in order to avoid unnecessary runs of your step in a SageMaker pipeline
- A list of step names, step instances, or step collection instances that the
ProcessingStep
depends on - The display name of the
ProcessingStep
- A description of the
ProcessingStep
- Property files
- Retry policies
The arguments are handed over to the ProcessingStep
. You can use the
Each processor comes with its own needs, depending on the framework. This is best illustrated using the PySparkProcessor
, where you can pass additional information to optimize the ProcessingStep
further, for instance via the configuration
parameter when running your job.
Run SageMaker Processing jobs in a secure environment
It’s
NetworkConfig.VpcConfig
request parameter of the CreateProcessingJob
API. We provide examples of this configuration using the
PySpark ProcessingStep within SageMaker Pipelines
For this example, we assume that you have Studio deployed in a secure environment already available, including VPC, VPC endpoints, security groups,
As an example, we set up a pipeline containing a single ProcessingStep
in which we’re simply reading and writing the
ProcessingStep
.
We define parameters for the pipeline (name, role, buckets, and so on) and step-specific settings (instance type and count, framework version, and so on). In this example, we use a secure setup and also define subnets, security groups, and the inter-container traffic encryption. For this example, you need a pipeline execution role with SageMaker full access and a VPC. See the following code:
{
"pipeline_name": "ProcessingPipeline",
"trial": "test-blog-post",
"pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>",
"network_subnet_ids": [
"subnet-<SUBNET_ID>",
"subnet-<SUBNET_ID>"
],
"network_security_group_ids": [
"sg-<SG_ID>"
],
"pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>",
"pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>",
"pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py",
"spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json",
"pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py",
"process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}",
"pyspark_framework_version": "2.4",
"pyspark_process_name": "pyspark-processing",
"pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv",
"pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output",
"pyspark_process_instance_type": "ml.m5.4xlarge",
"pyspark_process_instance_count": 6,
"tags": {
"Project": "tag-for-project",
"Owner": "tag-for-owner"
}
}
To demonstrate, the following code example runs a PySpark script on SageMaker Processing within a pipeline by using the PySparkProcessor
:
# import code requirements
# standard libraries import
import logging
import json
# sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor
from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config
def create_pipeline(pipeline_params, logger):
"""
Args:
pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters
logger (logger): logger
Returns:
()
"""
# Create SageMaker Session
sagemaker_session = PipelineSession()
# Get Tags
tags_input = get_tags_input(pipeline_params["tags"])
# get network configuration
network_config = get_network_configuration(
subnets=pipeline_params["network_subnet_ids"],
security_group_ids=pipeline_params["network_security_group_ids"]
)
# Get Pipeline Configurations
pipeline_config = get_pipeline_config(pipeline_params)
# setting processing cache obj
logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days")
cache_config = CacheConfig(enable_caching=True, expire_after="p30d")
# Create PySpark Processing Step
logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor")
# setting up spark processor
processing_pyspark_processor = PySparkProcessor(
base_job_name=pipeline_params["pyspark_process_name"],
framework_version=pipeline_params["pyspark_framework_version"],
role=pipeline_params["pipeline_role"],
instance_count=pipeline_params["pyspark_process_instance_count"],
instance_type=pipeline_params["pyspark_process_instance_type"],
volume_kms_key=pipeline_params["pyspark_process_volume_kms"],
output_kms_key=pipeline_params["pyspark_process_output_kms"],
network_config=network_config,
tags=tags_input,
sagemaker_session=sagemaker_session
)
# setting up arguments
run_ags = processing_pyspark_processor.run(
submit_app=pipeline_params["pyspark_process_code"],
submit_py_files=[pipeline_params["pyspark_helper_code"]],
arguments=[
# processing input arguments. To add new arguments to this list you need to provide two entrances:
# 1st is the argument name preceded by "--" and the 2nd is the argument value
# setting up processing arguments
"--input_table", pipeline_params["pyspark_process_data_input"],
"--output_table", pipeline_params["pyspark_process_data_output"]
],
spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]),
inputs = [
ProcessingInput(
source=pipeline_params["spark_config_file"],
destination="/opt/ml/processing/input/conf",
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_data_distribution_type="FullyReplicated",
s3_compression_type="None"
)
],
)
# create step
pyspark_processing_step = ProcessingStep(
name=pipeline_params["pyspark_process_name"],
step_args=run_ags,
cache_config=cache_config,
)
# Create Pipeline
pipeline = Pipeline(
name=pipeline_params["pipeline_name"],
steps=[
pyspark_processing_step
],
pipeline_experiment_config=PipelineExperimentConfig(
pipeline_params["pipeline_name"],
pipeline_config["trial"]
),
sagemaker_session=sagemaker_session
)
pipeline.upsert(
role_arn=pipeline_params["pipeline_role"],
description="Example pipeline",
tags=tags_input
)
return pipeline
def main():
# set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.info("Get Pipeline Parameter")
with open("ml_pipeline/params/pipeline_params.json", "r") as f:
pipeline_params = json.load(f)
print(pipeline_params)
logger.info("Create Pipeline")
pipeline = create_pipeline(pipeline_params, logger=logger)
logger.info("Execute Pipeline")
execution = pipeline.start()
return execution
if __name__ == "__main__":
main()
As shown in the preceding code, we’re overwriting the default Spark configurations by providing configuration.json
as a ProcessingInput
. We use a configuration.json
file that was saved in
[
{
"Classification":"spark-defaults",
"Properties":{
"spark.executor.memory":"10g",
"spark.executor.memoryOverhead":"5g",
"spark.driver.memory":"10g",
"spark.driver.memoryOverhead":"10g",
"spark.driver.maxResultSize":"10g",
"spark.executor.cores":5,
"spark.executor.instances":5,
"spark.yarn.maxAppAttempts":1
"spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com",
"spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true
}
}
]
We can update the default Spark configuration either by passing the file as a ProcessingInput
or by using the configuration argument when running the run()
function.
The Spark configuration is dependent on other options, like the instance type and instance count chosen for the processing job. The first consideration is the number of instances, the vCPU cores that each of those instances have, and the instance memory. You can use
In addition, the executor and driver settings can be optimized even further. For an example of how to calculate these, refer to
Next, for driver and executor settings, we recommend investigating the committer settings to improve performance when writing to Amazon S3. In our case, we’re writing Parquet files to Amazon S3 and setting “ spark.sql.parquet.fs.optimized.comitter.optimization-enabled
” to true.
If needed for a connection to Amazon S3, a regional endpoint “ spark.hadoop.fs.s3a.endpoint
” can be specified within the configurations file.
In this example pipeline, the PySpark script spark_process.py
(as shown in the following code) loads a CSV file from Amazon S3 into a Spark data frame, and saves the data as Parquet back to Amazon S3.
Note that our example configuration is not proportionate to the workload because reading and writing the abalone dataset could be done on default settings on one instance. The configurations we mentioned should be defined based on your specific needs.
# import requirements
import argparse
import logging
import sys
import os
import pandas as pd
# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType
from data_utils import(
spark_read_parquet,
Unbuffered
)
sys.stdout = Unbuffered(sys.stdout)
# Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def main(data_path):
spark = SparkSession.builder.appName("PySparkJob").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
schema = StructType(
[
StructField("sex", StringType(), True),
StructField("length", FloatType(), True),
StructField("diameter", FloatType(), True),
StructField("height", FloatType(), True),
StructField("whole_weight", FloatType(), True),
StructField("shucked_weight", FloatType(), True),
StructField("viscera_weight", FloatType(), True),
StructField("rings", FloatType(), True),
]
)
df = spark.read.csv(data_path, header=False, schema=schema)
return df.select("sex", "length", "diameter", "rings")
if __name__ == "__main__":
logger.info(f"===============================================================")
logger.info(f"================= Starting pyspark-processing =================")
parser = argparse.ArgumentParser(description="app inputs")
parser.add_argument("--input_table", type=str, help="path to the channel data")
parser.add_argument("--output_table", type=str, help="path to the output data")
args = parser.parse_args()
df = main(args.input_table)
logger.info("Writing transformed data")
df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite")
# save data
df.coalesce(10).write.mode("overwrite").parquet(args.output_table)
logger.info(f"================== Ending pyspark-processing ==================")
logger.info(f"===============================================================")
To dive into optimizing Spark processing jobs, you can use the CloudWatch logs as well as the Spark UI. You can create the Spark UI by running a Processing job on a SageMaker notebook instance. You can view the
Clean up
If you followed the tutorial, it’s good practice to delete resources that are no longer used to stop incurring charges. Make sure to
Conclusion
In this post, we showed how to run a secure SageMaker Processing job using PySpark within SageMaker Pipelines. We also demonstrated how to optimize PySpark using Spark configurations and set up your Processing job to run in a secure networking configuration.
As a next step, explore how to automate the entire model lifecycle and how
About the Authors
Maren Suilmann is a Data Scientist at
Maira Ladeira Tanke is an ML Specialist at Amazon Web Services. With a background in data science, she has 9 years of experience architecting and building ML applications with customers across industries. As a technical lead, she helps customers accelerate their achievement of business value through emerging technologies and innovative solutions. In her free time, Maira enjoys traveling and spending time with her family someplace warm.
Pauline Ting is Data Scientist in the
Donald Fossouo is a Sr Data Architect in the
The mentioned AWS GenAI Services service names relating to generative AI are only available or previewed in the Global Regions. Amazon Web Services China promotes AWS GenAI Services relating to generative AI solely for China-to-global business purposes and/or advanced technology introduction.