Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark ETL FLEX jobs - L2 Constructs #2

Merged
merged 4 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/@aws-cdk/aws-glue-alpha/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ export enum WorkerType {
Z_2X = 'Z.2X',
}

/**
* The number of workers of a defined workerType that are allocated when a job runs.
*
* @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-job.html
*/

/**
* Job states emitted by Glue to CloudWatch Events.
*
Expand Down Expand Up @@ -196,6 +202,7 @@ export enum PythonVersion {
* Python 3.9 (the exact version depends on GlueVersion and JobCommand used)
*/
THREE_NINE = '3.9',

}

/**
Expand Down
6 changes: 4 additions & 2 deletions packages/@aws-cdk/aws-glue-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ export * from './storage-parameter';
export * from './constants';
export * from './jobs/job';
export * from './jobs/pyspark-etl-job';
export * from './jobs/pysparkflex-etl-job';
export * from './jobs/pyspark-streaming-job';
export * from './jobs/python-shell-job';
export * from './jobs/scala-spark-etl-job';
export * from './jobs/ray-job';
export * from './jobs/pyspark-streaming-job';
export * from './jobs/scala-spark-etl-job';
export * from './jobs/scala-spark-flex-etl-job';
export * from './jobs/scala-spark-streaming-job';
export * from './jobs/spark-ui-utils';
export * from './table-base';
Expand Down
195 changes: 195 additions & 0 deletions packages/@aws-cdk/aws-glue-alpha/lib/jobs/pysparkflex-etl-job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import * as iam from 'aws-cdk-lib/aws-iam';
import { Bucket } from 'aws-cdk-lib/aws-s3';
import { CfnJob } from 'aws-cdk-lib/aws-glue';
import { Job, JobProperties } from './job';
import { Construct } from 'constructs';
import { JobType, GlueVersion, JobLanguage, PythonVersion, WorkerType, ExecutionClass } from '../constants';
import { SparkUIProps, SparkUILoggingLocation, validateSparkUiPrefix, cleanSparkUiPrefixForGrant } from './spark-ui-utils';
import * as cdk from 'aws-cdk-lib/core';
import { Code } from '../code';

/**
* Flex Jobs class
*
* Flex jobs supports Python and Scala language.
* The flexible execution class is appropriate for non-urgent jobs such as
* pre-production jobs, testing, and one-time data loads.
* Flexible job runs are supported for jobs using AWS Glue version 3.0 or later and G.1X or
* G.2X worker types but will default to the latest version of Glue (currently Glue 3.0.)
*
* Similar to ETL, we’ll enable these features: —enable-metrics, —enable-spark-ui,
* —enable-continuous-cloudwatch-log
*
*/

export interface PySparkFlexEtlJobProps extends JobProperties {

/**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these in the RFC? Cannot see them

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are the Props interface.. not required to be passed during invocation

* Enables the Spark UI debugging and monitoring with the specified props.
*
* @default - Spark UI debugging and monitoring is disabled.
*
* @see https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui-jobs.html
* @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
*/
readonly sparkUI?: SparkUIProps;

/**
* Specifies configuration properties of a notification (optional).
* After a job run starts, the number of minutes to wait before sending a job run delay notification.
* @default - undefined
*/
readonly notifyDelayAfter?: cdk.Duration;

/**
* Additional Python files that AWS Glue adds to the Python path before executing your script.
*
* @default - no extra python files specified.
*
* @see `--extra-py-files` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
*/
readonly extraPythonFiles?: Code[];

/**
* Additional files, such as configuration files that AWS Glue copies to the working directory of your script before executing it.
*
* @default - no extra files specified.
*
* @see `--extra-files` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
*/
readonly extraFiles?: Code[];

}

/**
* A Python Spark ETL Glue Job
*/
export class PySparkFlexEtlJob extends Job {

// Implement abstract Job attributes
public readonly jobArn: string;
public readonly jobName: string;
public readonly role: iam.IRole;
public readonly grantPrincipal: iam.IPrincipal;

/**
* The Spark UI logs location if Spark UI monitoring and debugging is enabled.
*
* @see https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui-jobs.html
* @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
*/
public readonly sparkUILoggingLocation?: SparkUILoggingLocation;

/**
* PySparkFlexEtlJob constructor
*
* @param scope
* @param id
* @param props
*/
constructor(scope: Construct, id: string, props: PySparkFlexEtlJobProps) {
super(scope, id, {
physicalName: props.jobName,
});

// Set up role and permissions for principal
this.role = props.role, {
assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
managedPolicies: [iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole')],
};
this.grantPrincipal = this.role;

// Enable SparkUI by default as a best practice
const sparkUIArgs = props.sparkUI?.bucket ? this.setupSparkUI(this.role, props.sparkUI) : undefined;
this.sparkUILoggingLocation = sparkUIArgs?.location;

// Enable CloudWatch metrics and continuous logging by default as a best practice
const continuousLoggingArgs = props.continuousLogging?.enabled ? this.setupContinuousLogging(this.role, props.continuousLogging) : {};
const profilingMetricsArgs = { '--enable-metrics': '' };

// Gather executable arguments
const execuatbleArgs = this.executableArguments(props);

// Conbine command line arguments into a single line item
const defaultArguments = {
...execuatbleArgs,
...continuousLoggingArgs,
...profilingMetricsArgs,
...sparkUIArgs?.args,
...this.checkNoReservedArgs(props.defaultArguments),
};

const jobResource = new CfnJob(this, 'Resource', {
name: props.jobName,
description: props.description,
role: this.role.roleArn,
command: {
name: JobType.ETL,
scriptLocation: this.codeS3ObjectUrl(props.script),
pythonVersion: PythonVersion.THREE,
},
glueVersion: props.glueVersion ? props.glueVersion : GlueVersion.V3_0,
workerType: props.workerType ? props.workerType : WorkerType.G_2X,
numberOfWorkers: props.numberOrWorkers ? props.numberOrWorkers : 10,
maxRetries: props.maxRetries,
executionProperty: props.maxConcurrentRuns ? { maxConcurrentRuns: props.maxConcurrentRuns } : undefined,
notificationProperty: props.notifyDelayAfter ? { notifyDelayAfter: props.notifyDelayAfter.toMinutes() } : undefined,
timeout: props.timeout?.toMinutes(),
connections: props.connections ? { connections: props.connections.map((connection) => connection.connectionName) } : undefined,
securityConfiguration: props.securityConfiguration?.securityConfigurationName,
tags: props.tags,
executionClass: ExecutionClass.FLEX,
defaultArguments,
});

const resourceName = this.getResourceNameAttribute(jobResource.ref);
this.jobArn = this.buildJobArn(this, resourceName);
this.jobName = resourceName;
}

/**
*Set the executable arguments with best practices enabled by default
*
* @param props
* @returns An array of arguments for Glue to use on execution
*/
private executableArguments(props: PySparkFlexEtlJobProps) {
const args: { [key: string]: string } = {};
args['--job-language'] = JobLanguage.PYTHON;

if (props.extraPythonFiles && props.extraPythonFiles.length > 0) {
args['--extra-py-files'] = props.extraPythonFiles.map(code => this.codeS3ObjectUrl(code)).join(',');
}
if (props.extraFiles && props.extraFiles.length > 0) {
args['--extra-files'] = props.extraFiles.map(code => this.codeS3ObjectUrl(code)).join(',');
}

return args;
}

/**
* Set the arguments for sparkUI with best practices enabled by default
*
* @param sparkUiProps, role
* @returns An array of arguments for enabling sparkUI
*/

private setupSparkUI(role: iam.IRole, sparkUiProps: SparkUIProps) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments here to describe the functions purpose, including the arguments and return types

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


validateSparkUiPrefix(sparkUiProps.prefix);
const bucket = sparkUiProps.bucket ?? new Bucket(this, 'SparkUIBucket');
bucket.grantReadWrite(role, cleanSparkUiPrefixForGrant(sparkUiProps.prefix));
const args = {
'--enable-spark-ui': 'true',
'--spark-event-logs-path': bucket.s3UrlForObject(sparkUiProps.prefix),
};

return {
location: {
prefix: sparkUiProps.prefix,
bucket,
},
args,
};
}
}
Loading