Skip to content

Commit 87f38d0

Browse files
committedDec 28, 2019
This commit includes a major revision to the project.
* All IAM policies revised to provide least-privilege access * Configuration parameters renamed and re-documented to simplify the configuration process * "Putting it together" section updated with instructions & AWS CLI commands for copying Sales and Marketing sample data sets * Updated Python 2.7-style print statements * Minor bug fixes to GlueRunner and AthenaRunner lambda functions
1 parent af9c313 commit 87f38d0

18 files changed

+493
-370
lines changed
 

‎.gitignore

+7-1
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,10 @@ notebook
7171
.github
7272

7373
# Pynt
74-
*.pyc
74+
*.pyc
75+
76+
# Python Virtual Environment
77+
venv
78+
79+
# Copy of customized configuration files
80+
config-copy

‎README.md

+46-30
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,11 @@ Specifies parameters for creation of the `gluerunner-lambda` CloudFormation stac
161161
```json
162162
[
163163
{
164-
"ParameterKey": "SourceS3BucketName",
164+
"ParameterKey": "ArtifactBucketName",
165165
"ParameterValue": "<NO-DEFAULT>"
166166
},
167167
{
168-
"ParameterKey": "SourceS3Key",
168+
"ParameterKey": "LambdaSourceS3Key",
169169
"ParameterValue": "src/gluerunner.zip"
170170
},
171171
{
@@ -180,9 +180,9 @@ Specifies parameters for creation of the `gluerunner-lambda` CloudFormation stac
180180
```
181181
#### Parameters:
182182

183-
* `SourceS3BucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) from which the Glue Runner AWS Lambda function package (.zip file) will be fetched by AWS CloudFormation.
183+
* `ArtifactBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) in which Glue scripts and Lambda function source will be stored. **If a bucket with such a name does not exist, the `deploylambda` build command will create it for you with appropriate permissions.**
184184

185-
* `SourceS3Key` - The Amazon S3 key (e.g. `src/gluerunner.zip`) pointing to your AWS Lambda function's .zip package.
185+
* `LambdaSourceS3Key` - The Amazon S3 key (e.g. `src/gluerunner.zip`) pointing to your AWS Lambda function's .zip package in the artifact bucket.
186186

187187
* `DDBTableName` - The Amazon DynamoDB table in which the state of active AWS Glue jobs is tracked between Glue Runner AWS Lambda function invocations.
188188

@@ -196,11 +196,11 @@ Specifies parameters for creation of the `gluerunner-lambda` CloudFormation stac
196196
```json
197197
[
198198
{
199-
"ParameterKey": "SourceS3BucketName",
199+
"ParameterKey": "ArtifactBucketName",
200200
"ParameterValue": "<NO-DEFAULT>"
201201
},
202202
{
203-
"ParameterKey": "SourceS3Key",
203+
"ParameterKey": "LambdaSourceS3Key",
204204
"ParameterValue": "src/athenarunner.zip"
205205
},
206206
{
@@ -215,9 +215,9 @@ Specifies parameters for creation of the `gluerunner-lambda` CloudFormation stac
215215
```
216216
#### Parameters:
217217

218-
* `SourceS3BucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) from which the Athena Runner AWS Lambda function package (.zip file) will be fetched by AWS CloudFormation.
218+
* `ArtifactBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) in which Glue scripts and Lambda function source will be stored. **If a bucket with such a name does not exist, the `deploylambda` build command will create it for you with appropriate permissions.**
219219

220-
* `SourceS3Key` - The Amazon S3 key (e.g. `src/athenarunner.zip`) pointing to your AWS Lambda function's .zip package.
220+
* `LambdaSourceS3Key` - The Amazon S3 key (e.g. `src/athenarunner.zip`) pointing to your AWS Lambda function's .zip package.
221221

222222
* `DDBTableName` - The Amazon DynamoDB table in which the state of active AWS Athena queries is tracked between Athena Runner AWS Lambda function invocations.
223223

@@ -234,20 +234,20 @@ Sample content:
234234
```json
235235
{
236236
"gluerunner": {
237-
"SourceS3BucketName": "<NO-DEFAULT>",
238-
"SourceS3Key":"src/gluerunner.zip"
237+
"ArtifactBucketName": "<NO-DEFAULT>",
238+
"LambdaSourceS3Key":"src/gluerunner.zip"
239239
},
240240
"ons3objectcreated": {
241-
"SourceS3BucketName": "<NO-DEFAULT>",
242-
"SourceS3Key":"src/ons3objectcreated.zip"
241+
"ArtifactBucketName": "<NO-DEFAULT>",
242+
"LambdaSourceS3Key":"src/ons3objectcreated.zip"
243243
}
244244
}
245245
```
246246
#### Parameters:
247247

248-
* `SourceS3BucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) to which the Glue Runner AWS Lambda function package (.zip file) will be deployed. If a bucket with such a name does not exist, the `deploylambda` build command will create it for you with appropriate permissions.
248+
* `ArtifactBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) in which Glue scripts and Lambda function source will be stored. **If a bucket with such a name does not exist, the `deploylambda` build command will create it for you with appropriate permissions.**
249249

250-
* `SourceS3Key` - The Amazon S3 key (e.g. `src/gluerunner.zip`) for your AWS Lambda function's .zip package.
250+
* `LambdaSourceS3Key` - The Amazon S3 key (e.g. `src/gluerunner.zip`) for your AWS Lambda function's .zip package.
251251

252252
>**NOTE: The values set here must match values set in `cloudformation/gluerunner-lambda-params.json`.**
253253
@@ -260,26 +260,34 @@ Specifies parameters for creation of the `glue-resources` CloudFormation stack (
260260
```json
261261
[
262262
{
263-
"ParameterKey": "S3ETLScriptPath",
263+
"ParameterKey": "ArtifactBucketName",
264264
"ParameterValue": "<NO-DEFAULT>"
265265
},
266266
{
267-
"ParameterKey": "S3ETLOutputPath",
268-
"ParameterValue": "<NO-DEFAULT>"
267+
"ParameterKey": "ETLScriptsPrefix",
268+
"ParameterValue": "scripts"
269269
},
270270
{
271-
"ParameterKey": "SourceDataBucketName",
271+
"ParameterKey": "DataBucketName",
272272
"ParameterValue": "<NO-DEFAULT>"
273+
},
274+
{
275+
"ParameterKey": "ETLOutputPrefix",
276+
"ParameterValue": "output"
273277
}
274278
]
275279
```
276280
#### Parameters:
277281

278-
* `S3ETLScriptPath` - The Amazon S3 path (including bucket name and prefix in ``s3://example/path`` format) to which AWS Glue scripts under `glue-scripts` directory will be dpeloyed.
282+
* `ArtifactBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) that will be created by the `step-functions-resources.yaml` CloudFormation template. **If a bucket with such a name does not exist, the `deploylambda` build command will create it for you with appropriate permissions.**
283+
284+
* `ETLScriptsPrefix` - The Amazon S3 prefix (in the format ``example/path`` without leading or trailing '/') to which AWS Glue scripts will be deployed in the artifact bucket. Glue scripts can be found under the `glue-scripts` project directory
285+
286+
* `DataBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) that will be created by the `step-functions-resources.yaml` CloudFormation template. This is the bucket to which Sales and Marketing datasets must be uploaded. It is also the bucket in which output will be created. **This bucket is created by `step-functions-resources` CloudFormation. CloudFormation stack creation will fail if the bucket already exists.**
287+
288+
* `ETLOutputPrefix` - The Amazon S3 prefix (in the format ``example/path`` without leading or trailing '/') to which AWS Glue jobs will produce their intermediary outputs. This path will be created in the data bucket.
279289

280-
* `S3ETLOutputPath` - The Amazon S3 path to which AWS Glue jobs will produce their intermediary outputs.
281290

282-
* `SourceDataBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) that will be created by the `step-functions-resources.yaml` CloudFormation template. This is the bucket to which Sales and Marketing datasets must be uploaded.
283291

284292
The parameters are used by AWS CloudFormation during the creation of `glue-resources` stack.
285293

@@ -290,7 +298,7 @@ Specifies the parameters used by Glue Runner AWS Lambda function at run-time.
290298

291299
```json
292300
{
293-
"sfn_activity_arn": "<NO-DEFAULT>",
301+
"sfn_activity_arn": "arn:aws:states:<AWS-REGION>:<AWS-ACCOUNT-ID>:activity:GlueRunnerActivity",
294302
"sfn_worker_name": "gluerunner",
295303
"ddb_table": "GlueRunnerActiveJobs",
296304
"ddb_query_limit": 50,
@@ -299,7 +307,7 @@ Specifies the parameters used by Glue Runner AWS Lambda function at run-time.
299307
```
300308
#### Parameters:
301309

302-
* `sfn_activity_arn` - AWS Step Functions activity task ARN. This ARN is used to query AWS Step Functions for new tasks (i.e. new AWS Glue jobs to run). The ARN is a combination of the AWS region, your AWS account Id, and the name property of the [AWS::StepFunctions::Activity](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-stepfunctions-activity.html) resource in the `stepfunctions-resources.yaml` CloudFormation template. An ARN looks as follows `arn:aws:states:<AWS-REGION>:<YOUR-AWS-ACCOUNT-ID>:activity:<STEPFUNCTIONS-ACTIVITY-NAME>`. By default, the activity name is `GlueRunnerActivity`.
310+
* `sfn_activity_arn` - AWS Step Functions activity task ARN. This ARN is used to query AWS Step Functions for new tasks (i.e. new AWS Glue jobs to run). The ARN is a combination of the AWS region, your AWS account Id, and the name property of the [AWS::StepFunctions::Activity](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-stepfunctions-activity.html) resource in the `stepfunctions-resources.yaml` CloudFormation template. An ARN looks as follows `arn:aws:states:<AWS-REGION>:<AWS-ACCOUNT-ID>:activity:<STEPFUNCTIONS-ACTIVITY-NAME>`. By default, the activity name is `GlueRunnerActivity`.
303311

304312
* `sfn_worker_name` - A property that is passed to AWS Step Functions when getting activity tasks.
305313

@@ -317,19 +325,19 @@ Specifies parameters for creation of the `step-functions-resources` CloudFormati
317325
```json
318326
[
319327
{
320-
"ParameterKey": "SourceS3BucketName",
328+
"ParameterKey": "ArtifactBucketName",
321329
"ParameterValue": "<NO-DEFAULT>"
322330
},
323331
{
324-
"ParameterKey": "SourceS3Key",
332+
"ParameterKey": "LambdaSourceS3Key",
325333
"ParameterValue": "src/ons3objectcreated.zip"
326334
},
327335
{
328336
"ParameterKey": "GlueRunnerActivityName",
329337
"ParameterValue": "GlueRunnerActivity"
330338
},
331339
{
332-
"ParameterKey": "SourceDataBucketName",
340+
"ParameterKey": "DataBucketName",
333341
"ParameterValue": "<NO-DEFAULT>"
334342
}
335343
]
@@ -340,11 +348,11 @@ Specifies parameters for creation of the `step-functions-resources` CloudFormati
340348

341349
Both parameters are also used by AWS CloudFormation during stack creation.
342350

343-
* `SourceS3BucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) to which the `ons3objectcreated` AWS Lambda function package (.zip file) will be deployed. If a bucket with such a name does not exist, the `deploylambda` build command will create it for you with appropriate permissions.
351+
* `ArtifactBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix) to which the `ons3objectcreated` AWS Lambda function package (.zip file) will be deployed. If a bucket with such a name does not exist, the `deploylambda` build command will create it for you with appropriate permissions.
344352

345-
* `SourceS3Key` - The Amazon S3 key (e.g. `src/ons3objectcreated.zip`) for your AWS Lambda function's .zip package.
353+
* `LambdaSourceS3Key` - The Amazon S3 key (e.g. `src/ons3objectcreated.zip`) for your AWS Lambda function's .zip package.
346354

347-
* `SourceDataBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix). All OnS3ObjectCreated CloudWatch Events will for the bucket be handled by the `ons3objectcreated` AWS Lambda function. **This bucket will be created by CloudFormation. CloudFormation stack creation will fail if the bucket already exists.**
355+
* `DataBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix). All OnS3ObjectCreated CloudWatch Events will for the bucket be handled by the `ons3objectcreated` AWS Lambda function. **This bucket will be created by CloudFormation. CloudFormation stack creation will fail if the bucket already exists.**
348356

349357
<a name="build-commands"></a>
350358
# Build commands
@@ -490,7 +498,15 @@ pynt createstack["athenarunner-lambda"]
490498

491499
Note that the `step-functions-resources` stack **must** be created first, before the `glue-resources` stack.
492500

493-
Now head to the AWS Step Functions console. Start and observe an execution of the 'MarketingAndSalesETLOrchestrator' state machine. Execution should halt at the 'Wait for XYZ Data' states. At this point, you should upload the sample .CSV files under the `samples` directory to the S3 bucket you specified as the `SourceDataBucketName` parameter value in `step-functions-resources-config.json` configuration file. This should allow the state machine to move on to next steps -- Process Sales Data and Process Marketing Data.
501+
Now head to the AWS Step Functions console. Start and observe an execution of the 'MarketingAndSalesETLOrchestrator' state machine. Execution should halt at the 'Wait for XYZ Data' states. At this point, you should upload the sample .CSV files under the `samples` directory to the S3 bucket you specified as the `SourceDataBucketName` parameter value in `step-functions-resources-config.json` configuration file. **Upload the marketing sample file under prefix 'marketing' and the sales sample file under prefix 'sales'. To do that, you may issue the following AWS CLI commands while at the project's root directory:**
502+
503+
```
504+
aws s3 cp samples/MarketingData_QuickSightSample.csv s3://{SourceDataBucketName}/marketing/
505+
506+
aws s3 cp samples/SalesPipeline_QuickSightSample.csv s3://{SourceDataBucketName}/sales/
507+
```
508+
509+
This should allow the state machine to move on to next steps -- Process Sales Data and Process Marketing Data.
494510

495511
If you have setup and run the sample correctly, you should see this output in the AWS Step Functions console:
496512

‎build.py

+49-42
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,25 @@
1717
import shutil
1818
import zipfile
1919
import time
20+
21+
from pip._vendor.distlib.compat import raw_input
2022
from pynt import task
2123
import boto3
2224
import botocore
2325
from botocore.exceptions import ClientError
2426
import json
2527
import re
2628

29+
2730
def write_dir_to_zip(src, zf):
2831
'''Write a directory tree to an open ZipFile object.'''
2932
abs_src = os.path.abspath(src)
3033
for dirname, subdirs, files in os.walk(src):
3134
for filename in files:
3235
absname = os.path.abspath(os.path.join(dirname, filename))
3336
arcname = absname[len(abs_src) + 1:]
34-
print 'zipping %s as %s' % (os.path.join(dirname, filename),
35-
arcname)
37+
print('zipping {} as {}'.format(os.path.join(dirname, filename),
38+
arcname))
3639
zf.write(absname, arcname)
3740

3841
def read_json(jsonf_path):
@@ -63,7 +66,7 @@ def check_bucket_exists(s3path):
6366
@task()
6467
def clean():
6568
'''Clean build directory.'''
66-
print 'Cleaning build directory...'
69+
print('Cleaning build directory...')
6770

6871
if os.path.exists('build'):
6972
shutil.rmtree('build')
@@ -78,11 +81,11 @@ def packagelambda(* functions):
7881

7982
os.chdir("build")
8083

81-
if(len(functions) == 0):
84+
if len(functions) == 0:
8285
functions = ("athenarunner", "gluerunner", "ons3objectcreated")
8386

8487
for function in functions:
85-
print 'Packaging "{}" lambda function in directory'.format(function)
88+
print('Packaging "{}" lambda function in directory'.format(function))
8689
zipf = zipfile.ZipFile("%s.zip" % function, "w", zipfile.ZIP_DEFLATED)
8790

8891
write_dir_to_zip("../lambda/{}/".format(function), zipf)
@@ -99,7 +102,7 @@ def updatelambda(*functions):
99102
'''Directly update lambda function code in AWS (without upload to S3).'''
100103
lambda_client = boto3.client('lambda')
101104

102-
if(len(functions) == 0):
105+
if len(functions) == 0:
103106
functions = ("athenarunner", "gluerunner", "ons3objectcreated")
104107

105108
for function in functions:
@@ -115,7 +118,7 @@ def updatelambda(*functions):
115118
def deploylambda(*functions, **kwargs):
116119
'''Upload lambda functions .zip file to S3 for download by CloudFormation stack during creation.'''
117120

118-
if (len(functions) == 0):
121+
if len(functions) == 0:
119122
functions = ("athenarunner", "gluerunner", "ons3objectcreated")
120123

121124
region_name = boto3.session.Session().region_name
@@ -128,21 +131,21 @@ def deploylambda(*functions, **kwargs):
128131

129132
for function in functions:
130133

131-
src_s3_bucket_name = params[function]['SourceS3BucketName']
132-
src_s3_key = params[function]['SourceS3Key']
134+
src_s3_bucket_name = params[function]['ArtifactBucketName']
135+
src_s3_key = params[function]['LambdaSourceS3Key']
133136

134137
if not src_s3_key and not src_s3_bucket_name:
135138
print(
136-
"ERROR: Both Source S3 bucket name and S3 key must be specified for function '{}'. FUNCTION NOT DEPLOYED.".format(
139+
"ERROR: Both Artifact S3 bucket name and Lambda source S3 key must be specified for function '{}'. FUNCTION NOT DEPLOYED.".format(
137140
function))
138141
continue
139142

140143
print("Checking if S3 Bucket '{}' exists...".format(src_s3_bucket_name))
141144

142-
if (not check_bucket_exists(src_s3_bucket_name)):
145+
if not check_bucket_exists(src_s3_bucket_name):
143146
print("Bucket %s not found. Creating in region {}.".format(src_s3_bucket_name, region_name))
144147

145-
if (region_name == "us-east-1"):
148+
if region_name == "us-east-1":
146149
s3_client.create_bucket(
147150
# ACL="authenticated-read",
148151
Bucket=src_s3_bucket_name
@@ -156,7 +159,7 @@ def deploylambda(*functions, **kwargs):
156159
}
157160
)
158161

159-
print "Uploading function '{}' to '{}'".format(function, src_s3_key)
162+
print("Uploading function '{}' to '{}'".format(function, src_s3_key))
160163

161164
with open('build/{}.zip'.format(function), 'rb') as data:
162165
s3_client.upload_fileobj(data, src_s3_bucket_name, src_s3_key)
@@ -168,8 +171,9 @@ def deploylambda(*functions, **kwargs):
168171
def createstack(* stacks, **kwargs):
169172
'''Create stacks using CloudFormation.'''
170173

171-
if (len(stacks) == 0):
172-
print("ERROR: Please specify a stack to create. Valid values are glue-resources, gluerunner-lambda, step-functions-resources.")
174+
if len(stacks) == 0:
175+
print(
176+
"ERROR: Please specify a stack to create. Valid values are glue-resources, gluerunner-lambda, step-functions-resources.")
173177
return
174178

175179
for stack in stacks:
@@ -183,7 +187,7 @@ def createstack(* stacks, **kwargs):
183187

184188
cfn_client = boto3.client('cloudformation')
185189

186-
print("Attempting to CREATE '%s' stack using CloudFormation." % (stack_name))
190+
print("Attempting to CREATE '%s' stack using CloudFormation." % stack_name)
187191
start_t = time.time()
188192
response = cfn_client.create_stack(
189193
StackName=stack_name,
@@ -197,7 +201,7 @@ def createstack(* stacks, **kwargs):
197201
print("Waiting until '%s' stack status is CREATE_COMPLETE" % stack_name)
198202

199203
try:
200-
204+
# cc           +o
201205
cfn_stack_delete_waiter = cfn_client.get_waiter('stack_create_complete')
202206
cfn_stack_delete_waiter.wait(StackName=stack_name)
203207
print("Stack CREATED in approximately %d secs." % int(time.time() - start_t))
@@ -211,8 +215,9 @@ def createstack(* stacks, **kwargs):
211215
def updatestack(* stacks, **kwargs):
212216
'''Update a CloudFormation stack.'''
213217

214-
if (len(stacks) == 0):
215-
print("ERROR: Please specify a stack to create. Valid values are glue-resources, gluerunner-lambda, step-functions-resources.")
218+
if len(stacks) == 0:
219+
print(
220+
"ERROR: Please specify a stack to create. Valid values are glue-resources, gluerunner-lambda, step-functions-resources.")
216221
return
217222

218223
for stack in stacks:
@@ -226,7 +231,7 @@ def updatestack(* stacks, **kwargs):
226231

227232
cfn_client = boto3.client('cloudformation')
228233

229-
print("Attempting to UPDATE '%s' stack using CloudFormation." % (stack_name))
234+
print("Attempting to UPDATE '%s' stack using CloudFormation." % stack_name)
230235
try:
231236
start_t = time.time()
232237
response = cfn_client.update_stack(
@@ -244,13 +249,13 @@ def updatestack(* stacks, **kwargs):
244249

245250
print("Stack UPDATED in approximately %d secs." % int(time.time() - start_t))
246251
except ClientError as e:
247-
print "EXCEPTION: " + e.response["Error"]["Message"]
252+
print("EXCEPTION: " + e.response["Error"]["Message"])
248253

249254
@task()
250255
def stackstatus(* stacks):
251256
'''Check the status of a CloudFormation stack.'''
252257

253-
if (len(stacks) == 0):
258+
if len(stacks) == 0:
254259
stacks = ("glue-resources", "gluerunner-lambda", "step-functions-resources")
255260

256261
for stack in stacks:
@@ -263,18 +268,18 @@ def stackstatus(* stacks):
263268
StackName=stack_name
264269
)
265270

266-
if(response["Stacks"][0]):
271+
if response["Stacks"][0]:
267272
print("Stack '%s' has the status '%s'" % (stack_name, response["Stacks"][0]["StackStatus"]))
268273

269274
except ClientError as e:
270-
print "EXCEPTION: " + e.response["Error"]["Message"]
275+
print("EXCEPTION: " + e.response["Error"]["Message"])
271276

272277

273278
@task()
274279
def deletestack(* stacks):
275280
'''Delete stacks using CloudFormation.'''
276281

277-
if (len(stacks) == 0):
282+
if len(stacks) == 0:
278283
print("ERROR: Please specify a stack to delete.")
279284
return
280285

@@ -308,36 +313,38 @@ def deploygluescripts(**kwargs):
308313
glue_cfn_params = read_json("cloudformation/glue-resources-params.json")
309314

310315
s3_etl_script_path = ''
311-
316+
bucket_name = ''
317+
prefix = ''
312318
for param in glue_cfn_params:
313-
if param['ParameterKey'] == 'S3ETLScriptPath':
314-
s3_etl_script_path = param['ParameterValue']
319+
if param['ParameterKey'] == 'ArtifactBucketName':
320+
bucket_name = param['ParameterValue']
321+
if param['ParameterKey'] == 'ETLScriptsPrefix':
322+
prefix = param['ParameterValue']
315323

316-
if not s3_etl_script_path:
324+
if not bucket_name or not prefix:
317325
print(
318-
"ERROR: S3ETLScriptPath must be set in 'cloudformation/glue-resources-params.json'.")
326+
"ERROR: ArtifactBucketName and ETLScriptsPrefix must be set in 'cloudformation/glue-resources-params.json'.")
319327
return
320328

329+
s3_etl_script_path = 's3://' + bucket_name + '/' + prefix
330+
321331
result = re.search('s3://(.+?)/(.*)', s3_etl_script_path)
322-
if(result is None):
323-
print("ERROR: S3ETLScriptPath is malformed.")
332+
if result is None:
333+
print("ERROR: Invalid S3 ETL bucket name and/or script prefix.")
324334
return
325335

326-
s3_bucket_name = result.group(1)
327-
s3_key = result.group(2)
328-
329-
print("Checking if S3 Bucket '{}' exists...".format(s3_bucket_name))
336+
print("Checking if S3 Bucket '{}' exists...".format(bucket_name))
330337

331-
if (not check_bucket_exists(s3_bucket_name)):
332-
print("ERROR: S3 bucket for path '{}' not found.".format(s3_etl_script_path))
338+
if not check_bucket_exists(bucket_name):
339+
print("ERROR: S3 bucket '{}' not found.".format(bucket_name))
333340
return
334341

335342
for dirname, subdirs, files in os.walk(glue_scripts_path):
336343
for filename in files:
337344
absname = os.path.abspath(os.path.join(dirname, filename))
338-
print "Uploading AWS Glue script '{}' to '{}/{}'".format(absname, s3_bucket_name, s3_key)
345+
print("Uploading AWS Glue script '{}' to '{}/{}'".format(absname, bucket_name, prefix))
339346
with open(absname, 'rb') as data:
340-
s3_client.upload_fileobj(data, s3_bucket_name, '{}/{}'.format(s3_key, filename))
347+
s3_client.upload_fileobj(data, bucket_name, '{}/{}'.format(prefix, filename))
341348

342349
return
343350

@@ -348,9 +355,9 @@ def deletes3bucket(name):
348355

349356
proceed = raw_input(
350357
"This command will DELETE ALL DATA in S3 bucket '%s' and the BUCKET ITSELF.\nDo you wish to continue? [Y/N] " \
351-
% (name))
358+
% name)
352359

353-
if (proceed.lower() != 'y'):
360+
if proceed.lower() != 'y':
354361
print("Aborting deletion.")
355362
return
356363

‎cloudformation/athenarunner-lambda-params.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
[
22
{
3-
"ParameterKey": "SourceS3BucketName",
3+
"ParameterKey": "ArtifactBucketName",
44
"ParameterValue": "<NO-DEFAULT>"
55
},
66
{
7-
"ParameterKey": "SourceS3Key",
7+
"ParameterKey": "LambdaSourceS3Key",
88
"ParameterValue": "src/athenarunner.zip"
99
},
1010
{

‎cloudformation/athenarunner-lambda.yaml

+51-40
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ Parameters:
2929
Default: "athenarunner"
3030
Description: "Name of the Lambda function that mediates between AWS Step Functions and AWS Athena."
3131

32-
SourceS3BucketName:
32+
ArtifactBucketName:
3333
Type: String
3434
MinLength: "1"
3535
Description: "Name of the S3 bucket containing source .zip files."
3636

37-
SourceS3Key:
37+
LambdaSourceS3Key:
3838
Type: String
3939
MinLength: "1"
4040
Description: "Name of the S3 key of Athena Runner lambda function .zip file."
@@ -53,46 +53,59 @@ Resources:
5353
- lambda.amazonaws.com
5454
Action:
5555
- sts:AssumeRole
56-
ManagedPolicyArns:
57-
- arn:aws:iam::aws:policy/AmazonS3FullAccess
58-
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
59-
- arn:aws:iam::aws:policy/AmazonSNSFullAccess
60-
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
61-
- arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess
62-
6356
Path: "/"
6457

65-
AmazonAthenaFullAccessPolicy:
66-
Type: "AWS::IAM::Policy"
67-
Properties:
68-
PolicyDocument: {
69-
"Version": "2012-10-17",
70-
"Statement": [
71-
{
72-
"Effect": "Allow",
73-
"Action": "athena:*",
74-
"Resource": "*"
75-
}
76-
]
77-
}
78-
PolicyName: "AmazonAthenaFullAccessForAthenaRunner"
79-
Roles:
80-
- !Ref AthenaRunnerLambdaExecutionRole
81-
82-
AWSGlueFullAccessPolicy:
58+
AthenaRunnerPolicy:
8359
Type: "AWS::IAM::Policy"
8460
Properties:
8561
PolicyDocument: {
86-
"Version": "2012-10-17",
87-
"Statement": [
88-
{
89-
"Effect": "Allow",
90-
"Action": "glue:*",
91-
"Resource": "*"
92-
}
93-
]
62+
"Version": "2012-10-17",
63+
"Statement": [{
64+
"Effect": "Allow",
65+
"Action": [
66+
"dynamodb:GetItem",
67+
"dynamodb:Query",
68+
"dynamodb:PutItem",
69+
"dynamodb:UpdateItem",
70+
"dynamodb:DeleteItem"
71+
],
72+
"Resource": !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${DDBTableName}"
73+
},
74+
{
75+
"Effect": "Allow",
76+
"Action": [
77+
"logs:CreateLogStream",
78+
"logs:PutLogEvents"
79+
],
80+
"Resource": !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*"
81+
},
82+
{
83+
"Effect": "Allow",
84+
"Action": "logs:CreateLogGroup",
85+
"Resource": "*"
86+
},
87+
{
88+
"Effect": "Allow",
89+
"Action": [
90+
"states:SendTaskSuccess",
91+
"states:SendTaskFailure",
92+
"states:SendTaskHeartbeat",
93+
"states:GetActivityTask"
94+
],
95+
"Resource": "*"
96+
},
97+
{
98+
"Effect": "Allow",
99+
"Action": [
100+
"athena:StartQueryExecution",
101+
"athena:GetQueryExecution",
102+
"athena:GetNamedQuery"
103+
],
104+
"Resource": "*"
105+
}
106+
]
94107
}
95-
PolicyName: "AWSGlueFullAccessForAthenaRunner"
108+
PolicyName: "AthenaRunnerPolicy"
96109
Roles:
97110
- !Ref AthenaRunnerLambdaExecutionRole
98111

@@ -104,14 +117,12 @@ Resources:
104117
Handler: "athenarunner.handler"
105118
Role: !GetAtt AthenaRunnerLambdaExecutionRole.Arn
106119
Code:
107-
S3Bucket: !Ref SourceS3BucketName
108-
S3Key: !Ref SourceS3Key
120+
S3Bucket: !Ref ArtifactBucketName
121+
S3Key: !Ref LambdaSourceS3Key
109122
Timeout: 180 #seconds
110123
MemorySize: 128 #MB
111124
Runtime: python2.7
112125
DependsOn:
113-
- AmazonAthenaFullAccessPolicy
114-
- AWSGlueFullAccessPolicy
115126
- AthenaRunnerLambdaExecutionRole
116127

117128
ScheduledRule:
+8-4
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
[
22
{
3-
"ParameterKey": "S3ETLScriptPath",
3+
"ParameterKey": "ArtifactBucketName",
44
"ParameterValue": "<NO-DEFAULT>"
55
},
66
{
7-
"ParameterKey": "S3ETLOutputPath",
8-
"ParameterValue": "<NO-DEFAULT>"
7+
"ParameterKey": "ETLScriptsPrefix",
8+
"ParameterValue": "scripts"
99
},
1010
{
11-
"ParameterKey": "SourceDataBucketName",
11+
"ParameterKey": "DataBucketName",
1212
"ParameterValue": "<NO-DEFAULT>"
13+
},
14+
{
15+
"ParameterKey": "ETLOutputPrefix",
16+
"ParameterValue": "output"
1317
}
1418
]

‎cloudformation/glue-resources.yaml

+45-43
Original file line numberDiff line numberDiff line change
@@ -38,60 +38,62 @@ Parameters:
3838
Default: "marketing_qs"
3939
Description: "Name of the Marketing data table in AWS Glue."
4040

41-
S3ETLScriptPath:
41+
ETLScriptsPrefix:
4242
Type: String
43-
MinLength: "4"
43+
MinLength: "1"
4444
Description: "Location of the Glue job ETL scripts in S3."
4545

46-
S3ETLOutputPath:
46+
ETLOutputPrefix:
4747
Type: String
48-
MinLength: "10"
48+
MinLength: "1"
4949
Description: "Name of the S3 output path to which this CloudFormation template's AWS Glue jobs are going to write ETL output."
5050

51-
SourceDataBucketName:
51+
DataBucketName:
5252
Type: String
5353
MinLength: "1"
5454
Description: "Name of the S3 bucket in which the source Marketing and Sales data will be uploaded. Bucket is created by this CFT."
5555

56+
ArtifactBucketName:
57+
Type: String
58+
MinLength: "1"
59+
Description: "Name of the S3 bucket in which the Marketing and Sales ETL scripts reside. Bucket is NOT created by this CFT."
60+
5661
Resources:
5762

5863
### AWS GLUE RESOURCES ###
59-
AWSGlueCrawlerRole:
60-
Type: "AWS::IAM::Role"
61-
Properties:
62-
AssumeRolePolicyDocument:
63-
Version: '2012-10-17'
64-
Statement:
65-
- Effect: Allow
66-
Principal:
67-
Service:
68-
- glue.amazonaws.com
69-
Action:
70-
- sts:AssumeRole
71-
ManagedPolicyArns:
72-
- arn:aws:iam::aws:policy/AmazonS3FullAccess
73-
- arn:aws:iam::aws:policy/AdministratorAccess
74-
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
75-
Path: "/"
76-
7764
AWSGlueJobRole:
7865
Type: "AWS::IAM::Role"
7966
Properties:
8067
AssumeRolePolicyDocument:
8168
Version: '2012-10-17'
8269
Statement:
83-
- Effect: Allow
84-
Principal:
85-
Service:
86-
- glue.amazonaws.com
87-
Action:
88-
- sts:AssumeRole
70+
- Effect: Allow
71+
Principal:
72+
Service:
73+
- glue.amazonaws.com
74+
Action:
75+
- sts:AssumeRole
76+
Policies:
77+
- PolicyName: root
78+
PolicyDocument:
79+
Version: 2012-10-17
80+
Statement:
81+
- Effect: Allow
82+
Action:
83+
- "s3:GetObject"
84+
- "s3:PutObject"
85+
- "s3:ListBucket"
86+
- "s3:DeleteObject"
87+
Resource:
88+
- !Sub "arn:aws:s3:::${DataBucketName}"
89+
- !Sub "arn:aws:s3:::${DataBucketName}/*"
90+
- !Sub "arn:aws:s3:::${ArtifactBucketName}"
91+
- !Sub "arn:aws:s3:::${ArtifactBucketName}/*"
8992
ManagedPolicyArns:
90-
- arn:aws:iam::aws:policy/AmazonS3FullAccess
91-
- arn:aws:iam::aws:policy/AdministratorAccess
92-
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
93+
- arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
9394
Path: "/"
9495

96+
9597
MarketingAndSalesDatabase:
9698
Type: "AWS::Glue::Database"
9799
Properties:
@@ -170,7 +172,7 @@ Resources:
170172
}
171173
SerializationLibrary: "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
172174
Compressed: False
173-
Location: !Sub "s3://${SourceDataBucketName}/sales/"
175+
Location: !Sub "s3://${DataBucketName}/sales/"
174176
Retention: 0
175177
Name: !Ref SalesPipelineTableName
176178
DatabaseName: !Ref MarketingAndSalesDatabaseName
@@ -260,7 +262,7 @@ Resources:
260262
}
261263
SerializationLibrary: "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
262264
Compressed: False
263-
Location: !Sub "s3://${SourceDataBucketName}/marketing/"
265+
Location: !Sub "s3://${DataBucketName}/marketing/"
264266
Retention: 0
265267
Name: !Ref MarketingTableName
266268
DatabaseName: !Ref MarketingAndSalesDatabaseName
@@ -273,12 +275,12 @@ Resources:
273275
Name: "ProcessSalesData"
274276
Command: {
275277
"Name" : "glueetl",
276-
"ScriptLocation" : !Sub "${S3ETLScriptPath}/process_sales_data.py"
278+
"ScriptLocation": !Sub "s3://${ArtifactBucketName}/${ETLScriptsPrefix}/process_sales_data.py"
277279
}
278280
DefaultArguments: {
279281
"--database_name" : !Ref MarketingAndSalesDatabaseName,
280282
"--table_name" : !Ref SalesPipelineTableName,
281-
"--s3_output_path" : !Sub "${S3ETLOutputPath}/tmp/sales"
283+
"--s3_output_path": !Sub "s3://${DataBucketName}/${ETLOutputPrefix}/tmp/sales"
282284
}
283285
MaxRetries: 0
284286
Description: "Process Sales Pipeline data."
@@ -291,12 +293,12 @@ Resources:
291293
Name: "ProcessMarketingData"
292294
Command: {
293295
"Name" : "glueetl",
294-
"ScriptLocation" : !Sub "${S3ETLScriptPath}/process_marketing_data.py"
296+
"ScriptLocation": !Sub "s3://${ArtifactBucketName}/${ETLScriptsPrefix}/process_marketing_data.py"
295297
}
296298
DefaultArguments: {
297299
"--database_name" : !Ref MarketingAndSalesDatabaseName,
298300
"--table_name" : !Ref MarketingTableName,
299-
"--s3_output_path" : !Sub "${S3ETLOutputPath}/tmp/marketing"
301+
"--s3_output_path": !Sub "s3://${DataBucketName}/${ETLOutputPrefix}/tmp/marketing"
300302
}
301303
MaxRetries: 0
302304
Description: "Process Marketing data."
@@ -309,13 +311,13 @@ Resources:
309311
Name: "JoinMarketingAndSalesData"
310312
Command: {
311313
"Name" : "glueetl",
312-
"ScriptLocation" : !Sub "${S3ETLScriptPath}/join_marketing_and_sales_data.py"
314+
"ScriptLocation": !Sub "s3://${ArtifactBucketName}/${ETLScriptsPrefix}/join_marketing_and_sales_data.py"
313315
}
314316
DefaultArguments: {
315-
"--database_name" : !Ref MarketingAndSalesDatabaseName,
316-
"--s3_output_path" : !Sub "${S3ETLOutputPath}/sales-leads-influenced",
317-
"--s3_sales_data_path" : !Sub "${S3ETLOutputPath}/tmp/sales",
318-
"--s3_marketing_data_path" : !Sub "${S3ETLOutputPath}/tmp/marketing"
317+
"--database_name": !Ref MarketingAndSalesDatabaseName,
318+
"--s3_output_path": !Sub "s3://${DataBucketName}/${ETLOutputPrefix}/sales-leads-influenced",
319+
"--s3_sales_data_path": !Sub "s3://${DataBucketName}/${ETLOutputPrefix}/tmp/sales",
320+
"--s3_marketing_data_path": !Sub "s3://${DataBucketName}/${ETLOutputPrefix}/tmp/marketing"
319321
}
320322
MaxRetries: 0
321323
Description: "Join Marketing and Sales data."

‎cloudformation/gluerunner-lambda-params.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
[
22
{
3-
"ParameterKey": "SourceS3BucketName",
3+
"ParameterKey": "ArtifactBucketName",
44
"ParameterValue": "<NO-DEFAULT>"
55
},
66
{
7-
"ParameterKey": "SourceS3Key",
7+
"ParameterKey": "LambdaSourceS3Key",
88
"ParameterValue": "src/gluerunner.zip"
99
},
1010
{

‎cloudformation/gluerunner-lambda.yaml

+50-22
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ Parameters:
2929
Default: "gluerunner"
3030
Description: "Name of the Lambda function that mediates between AWS Step Functions and AWS Glue."
3131

32-
SourceS3BucketName:
32+
ArtifactBucketName:
3333
Type: String
3434
MinLength: "1"
3535
Description: "Name of the S3 bucket containing source .zip files."
3636

37-
SourceS3Key:
37+
LambdaSourceS3Key:
3838
Type: String
3939
MinLength: "1"
4040
Description: "Name of the S3 key of Glue Runner lambda function .zip file."
@@ -53,29 +53,58 @@ Resources:
5353
- lambda.amazonaws.com
5454
Action:
5555
- sts:AssumeRole
56-
ManagedPolicyArns:
57-
- arn:aws:iam::aws:policy/AmazonS3FullAccess
58-
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
59-
- arn:aws:iam::aws:policy/AmazonSNSFullAccess
60-
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
61-
- arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess
62-
6356
Path: "/"
6457

65-
AWSGlueFullAccessPolicy:
58+
GlueRunnerPolicy:
6659
Type: "AWS::IAM::Policy"
6760
Properties:
6861
PolicyDocument: {
69-
"Version": "2012-10-17",
70-
"Statement": [
71-
{
72-
"Effect": "Allow",
73-
"Action": "glue:*",
74-
"Resource": "*"
75-
}
76-
]
62+
"Version": "2012-10-17",
63+
"Statement": [{
64+
"Effect": "Allow",
65+
"Action": [
66+
"dynamodb:GetItem",
67+
"dynamodb:Query",
68+
"dynamodb:PutItem",
69+
"dynamodb:UpdateItem",
70+
"dynamodb:DeleteItem"
71+
],
72+
"Resource": !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${DDBTableName}"
73+
},
74+
{
75+
"Effect": "Allow",
76+
"Action": [
77+
"logs:CreateLogStream",
78+
"logs:PutLogEvents"
79+
],
80+
"Resource": !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*"
81+
},
82+
{
83+
"Effect": "Allow",
84+
"Action": "logs:CreateLogGroup",
85+
"Resource": "*"
86+
},
87+
{
88+
"Effect": "Allow",
89+
"Action": [
90+
"states:SendTaskSuccess",
91+
"states:SendTaskFailure",
92+
"states:SendTaskHeartbeat",
93+
"states:GetActivityTask"
94+
],
95+
"Resource": "*"
96+
},
97+
{
98+
"Effect": "Allow",
99+
"Action": [
100+
"glue:StartJobRun",
101+
"glue:GetJobRun"
102+
],
103+
"Resource": "*"
104+
}
105+
]
77106
}
78-
PolicyName: "AWSGlueFullAccessForGlueRunner"
107+
PolicyName: "GlueRunnerPolicy"
79108
Roles:
80109
- !Ref GlueRunnerLambdaExecutionRole
81110

@@ -87,13 +116,12 @@ Resources:
87116
Handler: "gluerunner.handler"
88117
Role: !GetAtt GlueRunnerLambdaExecutionRole.Arn
89118
Code:
90-
S3Bucket: !Ref SourceS3BucketName
91-
S3Key: !Ref SourceS3Key
119+
S3Bucket: !Ref ArtifactBucketName
120+
S3Key: !Ref LambdaSourceS3Key
92121
Timeout: 180 #seconds
93122
MemorySize: 128 #MB
94123
Runtime: python2.7
95124
DependsOn:
96-
- AWSGlueFullAccessPolicy
97125
- GlueRunnerLambdaExecutionRole
98126

99127
ScheduledRule:
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
[
22
{
3-
"ParameterKey": "SourceS3BucketName",
3+
"ParameterKey": "ArtifactBucketName",
44
"ParameterValue": "<NO-DEFAULT>"
55
},
66
{
7-
"ParameterKey": "SourceS3Key",
7+
"ParameterKey": "LambdaSourceS3Key",
88
"ParameterValue": "src/ons3objectcreated.zip"
99
},
1010
{
1111
"ParameterKey": "GlueRunnerActivityName",
1212
"ParameterValue": "GlueRunnerActivity"
1313
},
1414
{
15-
"ParameterKey": "SourceDataBucketName",
16-
"ParameterValue": "<NO-DEFAULT>"
15+
"ParameterKey": "DataBucketName",
16+
"ParameterValue": "etl-orchestrator-745-data"
1717
}
1818
]

‎cloudformation/step-functions-resources.yaml

+55-19
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,17 @@ Parameters:
4848
Default: "AthenaRunnerActivity"
4949
Description: "Name of the AWS Step Functions activity to be polled by AthenaRunner."
5050

51-
SourceS3BucketName:
51+
ArtifactBucketName:
5252
Type: String
5353
MinLength: "1"
54-
Description: "Name of the S3 bucket containing source .zip files."
54+
Description: "Name of the S3 bucket containing source .zip files. Bucket is NOT created by this CFT."
5555

56-
SourceS3Key:
56+
LambdaSourceS3Key:
5757
Type: String
5858
MinLength: "1"
5959
Description: "Name of the S3 key of Glue Runner lambda function .zip file."
6060

61-
SourceDataBucketName:
61+
DataBucketName:
6262
Type: String
6363
MinLength: "1"
6464
Description: "Name of the S3 bucket in which the source Marketing and Sales data will be uploaded. Bucket is created by this CFT."
@@ -104,12 +104,12 @@ Resources:
104104
WaitForSalesDataActivity:
105105
Type: "AWS::StepFunctions::Activity"
106106
Properties:
107-
Name: !Sub "${SourceDataBucketName}-SalesPipeline_QuickSightSample.csv"
107+
Name: !Sub "${DataBucketName}-SalesPipeline_QuickSightSample.csv"
108108

109109
WaitForMarketingDataActivity:
110110
Type: "AWS::StepFunctions::Activity"
111111
Properties:
112-
Name: !Sub "${SourceDataBucketName}-MarketingData_QuickSightSample.csv"
112+
Name: !Sub "${DataBucketName}-MarketingData_QuickSightSample.csv"
113113

114114

115115
# State Machine resources
@@ -141,9 +141,9 @@ Resources:
141141
GlueDatabaseName: !Ref MarketingAndSalesDatabaseName,
142142
GlueTableName: !Ref MarketingTableName,
143143
AthenaRunnerActivityArn: !Ref AthenaRunnerActivity,
144-
AthenaResultOutputLocation: !Sub "s3://${SourceDataBucketName}/athena-runner-output/",
144+
AthenaResultOutputLocation: !Sub "s3://${DataBucketName}/athena-runner-output/",
145145
AthenaResultEncryptionOption: "SSE_S3"
146-
}
146+
}
147147
RoleArn: !GetAtt StateExecutionRole.Arn
148148

149149
MarketingAndSalesETLOrchestrator:
@@ -256,16 +256,53 @@ Resources:
256256
- Effect: Allow
257257
Principal:
258258
Service:
259-
- lambda.amazonaws.com
259+
- lambda.amazonaws.com
260260
Action:
261-
- sts:AssumeRole
261+
- sts:AssumeRole
262262
ManagedPolicyArns:
263263
- arn:aws:iam::aws:policy/AmazonS3FullAccess
264264
- arn:aws:iam::aws:policy/AmazonSNSFullAccess
265265
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
266266
- arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess
267267
Path: "/"
268268

269+
OnS3ObjectCreatedPolicy:
270+
Type: "AWS::IAM::Policy"
271+
Properties:
272+
PolicyDocument: {
273+
"Version": "2012-10-17",
274+
"Statement": [
275+
{
276+
"Effect": "Allow",
277+
"Action": [
278+
"logs:CreateLogStream",
279+
"logs:PutLogEvents"
280+
],
281+
"Resource": !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*"
282+
},
283+
{
284+
"Effect": "Allow",
285+
"Action": "logs:CreateLogGroup",
286+
"Resource": "*"
287+
},
288+
{
289+
"Effect": "Allow",
290+
"Action": [
291+
"states:SendTaskSuccess",
292+
"states:SendTaskFailure",
293+
"states:SendTaskHeartbeat",
294+
"states:GetActivityTask"
295+
],
296+
"Resource": "*"
297+
}
298+
]
299+
}
300+
PolicyName: "OnS3ObjectCreatedPolicy"
301+
Roles:
302+
- !Ref OnS3ObjectCreatedLambdaExecutionRole
303+
304+
305+
269306
OnS3ObjectCreatedLambdaFunction:
270307
Type: "AWS::Lambda::Function"
271308
Properties:
@@ -274,8 +311,8 @@ Resources:
274311
Handler: "ons3objectcreated.handler"
275312
Role: !GetAtt OnS3ObjectCreatedLambdaExecutionRole.Arn
276313
Code:
277-
S3Bucket: !Ref SourceS3BucketName
278-
S3Key: !Ref SourceS3Key
314+
S3Bucket: !Ref ArtifactBucketName
315+
S3Key: !Ref LambdaSourceS3Key
279316
Timeout: 180 #seconds
280317
MemorySize: 128 #MB
281318
Runtime: python2.7
@@ -284,14 +321,13 @@ Resources:
284321

285322
# For every bucket that needs to invoke OnS3ObjectCreated:
286323

287-
SourceDataBucket:
324+
DataBucket:
288325
Type: "AWS::S3::Bucket"
289326
Properties:
290-
BucketName: !Ref SourceDataBucketName
327+
BucketName: !Ref DataBucketName
291328
NotificationConfiguration:
292329
LambdaConfigurations:
293-
-
294-
Function: !GetAtt OnS3ObjectCreatedLambdaFunction.Arn
330+
- Function: !GetAtt OnS3ObjectCreatedLambdaFunction.Arn
295331
Event: "s3:ObjectCreated:*"
296332
Filter:
297333
S3Key:
@@ -300,13 +336,13 @@ Resources:
300336
Name: "suffix"
301337
Value: "csv"
302338
DependsOn:
303-
- SourceDataBucketPermission
339+
- DataBucketPermission
304340

305-
SourceDataBucketPermission:
341+
DataBucketPermission:
306342
Type: "AWS::Lambda::Permission"
307343
Properties:
308344
Action: 'lambda:InvokeFunction'
309345
FunctionName: !Ref OnS3ObjectCreatedLambdaFunction
310346
Principal: s3.amazonaws.com
311347
SourceAccount: !Ref "AWS::AccountId"
312-
SourceArn: !Sub "arn:aws:s3:::${SourceDataBucketName}"
348+
SourceArn: !Sub "arn:aws:s3:::${DataBucketName}"

‎glue-scripts/process_marketing_data.py

-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
('return visitors', 'bigint', 'return_visitors', 'bigint'),
4646
], transformation_ctx='applymapping1')
4747

48-
print 'Count: ', mktg_DyF.count()
4948
mktg_DyF.printSchema()
5049

5150
mktg_DF = mktg_DyF.toDF()

‎glue-scripts/process_sales_data.py

-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
('last status entry', 'string', 'last_status_entry', 'string'),
5252
], transformation_ctx='applymapping1')
5353

54-
print 'Count: ', sales_DyF.count()
5554
sales_DyF.printSchema()
5655

5756
sales_DF = sales_DyF.toDF()

‎lambda/athenarunner/athenarunner.py

+91-79
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def load_config():
3434
def is_json(jsonstring):
3535
try:
3636
json_object = json.loads(jsonstring)
37-
except ValueError, e:
37+
except ValueError:
3838
return False
3939
return True
4040

@@ -177,102 +177,114 @@ def check_athena_queries(config):
177177
athena_query_execution_id = item['athena_query_execution_id']
178178
sfn_task_token = item['sfn_task_token']
179179

180-
logger.debug('Polling Athena query execution status..')
181-
182-
# Query athena query execution status...
183-
athena_resp = athena.get_query_execution(
184-
QueryExecutionId=athena_query_execution_id
185-
)
186-
187-
query_exec_resp = athena_resp['QueryExecution']
188-
query_exec_state = query_exec_resp['Status']['State']
189-
query_state_change_reason = query_exec_resp['Status'].get('StateChangeReason', '')
190-
191-
logger.debug('Query with Execution Id {} is currently in state "{}"'.format(query_exec_state, query_state_change_reason))
192-
193-
# If Athena query completed, return success:
194-
if query_exec_state in ['SUCCEEDED']:
180+
try:
181+
logger.debug('Polling Athena query execution status..')
195182

196-
logger.info('Query with Execution Id {} SUCCEEDED.'.format(athena_query_execution_id))
183+
# Query athena query execution status...
184+
athena_resp = athena.get_query_execution(
185+
QueryExecutionId=athena_query_execution_id
186+
)
197187

198-
# Build an output dict and format it as JSON
199-
task_output_dict = {
200-
"AthenaQueryString": query_exec_resp['Query'],
201-
"AthenaQueryExecutionId": athena_query_execution_id,
202-
"AthenaQueryExecutionState": query_exec_state,
203-
"AthenaQueryExecutionStateChangeReason": query_state_change_reason,
204-
"AthenaQuerySubmissionDateTime": query_exec_resp['Status'].get('SubmissionDateTime', '').strftime('%x, %-I:%M %p %Z'),
205-
"AthenaQueryCompletionDateTime": query_exec_resp['Status'].get('CompletionDateTime', '').strftime(
206-
'%x, %-I:%M %p %Z'),
207-
"AthenaQueryEngineExecutionTimeInMillis": query_exec_resp['Statistics'].get('EngineExecutionTimeInMillis', 0),
208-
"AthenaQueryDataScannedInBytes": query_exec_resp['Statistics'].get('DataScannedInBytes', 0)
209-
}
188+
query_exec_resp = athena_resp['QueryExecution']
189+
query_exec_state = query_exec_resp['Status']['State']
190+
query_state_change_reason = query_exec_resp['Status'].get('StateChangeReason', '')
191+
192+
logger.debug('Query with Execution Id {} is currently in state "{}"'.format(query_exec_state,
193+
query_state_change_reason))
194+
195+
# If Athena query completed, return success:
196+
if query_exec_state in ['SUCCEEDED']:
197+
198+
logger.info('Query with Execution Id {} SUCCEEDED.'.format(athena_query_execution_id))
199+
200+
# Build an output dict and format it as JSON
201+
task_output_dict = {
202+
"AthenaQueryString": query_exec_resp['Query'],
203+
"AthenaQueryExecutionId": athena_query_execution_id,
204+
"AthenaQueryExecutionState": query_exec_state,
205+
"AthenaQueryExecutionStateChangeReason": query_state_change_reason,
206+
"AthenaQuerySubmissionDateTime": query_exec_resp['Status'].get('SubmissionDateTime', '').strftime(
207+
'%x, %-I:%M %p %Z'),
208+
"AthenaQueryCompletionDateTime": query_exec_resp['Status'].get('CompletionDateTime', '').strftime(
209+
'%x, %-I:%M %p %Z'),
210+
"AthenaQueryEngineExecutionTimeInMillis": query_exec_resp['Statistics'].get(
211+
'EngineExecutionTimeInMillis', 0),
212+
"AthenaQueryDataScannedInBytes": query_exec_resp['Statistics'].get('DataScannedInBytes', 0)
213+
}
210214

211-
task_output_json = json.dumps(task_output_dict)
215+
task_output_json = json.dumps(task_output_dict)
212216

217+
logger.info('Sending "Task Succeeded" signal to Step Functions..')
218+
sfn_resp = sfn.send_task_success(
219+
taskToken=sfn_task_token,
220+
output=task_output_json
221+
)
213222

214-
logger.info('Sending "Task Succeeded" signal to Step Functions..')
215-
sfn_resp = sfn.send_task_success(
216-
taskToken=sfn_task_token,
217-
output=task_output_json
218-
)
223+
# Delete item
224+
resp = ddb_table.delete_item(
225+
Key={
226+
'sfn_activity_arn': sfn_activity_arn,
227+
'athena_query_execution_id': athena_query_execution_id
228+
}
229+
)
219230

220-
# Delete item
221-
resp = ddb_table.delete_item(
222-
Key={
223-
'sfn_activity_arn': sfn_activity_arn,
224-
'athena_query_execution_id': athena_query_execution_id
225-
}
226-
)
231+
# Task succeeded, next item
227232

228-
# Task succeeded, next item
233+
elif query_exec_state in ['RUNNING', 'QUEUED']:
234+
logger.debug(
235+
'Query with Execution Id {} is in state hasn\'t completed yet.'.format(athena_query_execution_id))
229236

230-
elif query_exec_state in ['RUNNING', 'QUEUED']:
231-
logger.debug('Query with Execution Id {} is in state hasn\'t completed yet.'.format(athena_query_execution_id))
237+
# Send heartbeat
238+
sfn_resp = sfn.send_task_heartbeat(
239+
taskToken=sfn_task_token
240+
)
232241

233-
# Send heartbeat
234-
sfn_resp = sfn.send_task_heartbeat(
235-
taskToken=sfn_task_token
236-
)
242+
logger.debug('Heartbeat sent to Step Functions.')
237243

238-
logger.debug('Heartbeat sent to Step Functions.')
244+
# Heartbeat sent, next item
239245

240-
# Heartbeat sent, next item
246+
elif query_exec_state in ['FAILED', 'CANCELLED']:
241247

242-
elif query_exec_state in ['FAILED', 'CANCELLED']:
248+
message = 'Athena query with Execution Id "{}" failed. Last state: {}. Error message: {}' \
249+
.format(athena_query_execution_id, query_exec_state, query_state_change_reason)
243250

244-
message = 'Athena query with Execution Id "{}" failed. Last state: {}. Error message: {}'\
245-
.format(athena_query_execution_id, query_exec_state, query_state_change_reason)
251+
logger.error(message)
246252

247-
logger.error(message)
253+
message_json = {
254+
"AthenaQueryString": query_exec_resp['Query'],
255+
"AthenaQueryExecutionId": athena_query_execution_id,
256+
"AthenaQueryExecutionState": query_exec_state,
257+
"AthenaQueryExecutionStateChangeReason": query_state_change_reason,
258+
"AthenaQuerySubmissionDateTime": query_exec_resp['Status'].get('SubmissionDateTime', '').strftime(
259+
'%x, %-I:%M %p %Z'),
260+
"AthenaQueryCompletionDateTime": query_exec_resp['Status'].get('CompletionDateTime', '').strftime(
261+
'%x, %-I:%M %p %Z'),
262+
"AthenaQueryEngineExecutionTimeInMillis": query_exec_resp['Statistics'].get(
263+
'EngineExecutionTimeInMillis', 0),
264+
"AthenaQueryDataScannedInBytes": query_exec_resp['Statistics'].get('DataScannedInBytes', 0)
265+
}
248266

249-
message_json={
250-
"AthenaQueryString": query_exec_resp['Query'],
251-
"AthenaQueryExecutionId": athena_query_execution_id,
252-
"AthenaQueryExecutionState": query_exec_state,
253-
"AthenaQueryExecutionStateChangeReason": query_state_change_reason,
254-
"AthenaQuerySubmissionDateTime": query_exec_resp['Status'].get('SubmissionDateTime', '').strftime('%x, %-I:%M %p %Z'),
255-
"AthenaQueryCompletionDateTime": query_exec_resp['Status'].get('CompletionDateTime', '').strftime(
256-
'%x, %-I:%M %p %Z'),
257-
"AthenaQueryEngineExecutionTimeInMillis": query_exec_resp['Statistics'].get('EngineExecutionTimeInMillis', 0),
258-
"AthenaQueryDataScannedInBytes": query_exec_resp['Statistics'].get('DataScannedInBytes', 0)
259-
}
267+
sfn_resp = sfn.send_task_failure(
268+
taskToken=sfn_task_token,
269+
cause=json.dumps(message_json),
270+
error='AthenaQueryFailedError'
271+
)
260272

261-
sfn_resp = sfn.send_task_failure(
262-
taskToken=sfn_task_token,
263-
cause=json.dumps(message_json),
264-
error='AthenaQueryFailedError'
265-
)
273+
# Delete item
274+
resp = ddb_table.delete_item(
275+
Key={
276+
'sfn_activity_arn': sfn_activity_arn,
277+
'athena_query_execution_id': athena_query_execution_id
278+
}
279+
)
266280

267-
# Delete item
268-
resp = ddb_table.delete_item(
269-
Key={
270-
'sfn_activity_arn': sfn_activity_arn,
271-
'athena_query_execution_id': athena_query_execution_id
272-
}
273-
)
281+
logger.error(message)
274282

275-
logger.error(message)
283+
except Exception as e:
284+
logger.error('There was a problem checking status of Athena query..')
285+
logger.error('Glue job Run Id "{}"'.format(athena_query_execution_id))
286+
logger.error('Reason: {}'.format(e.message))
287+
logger.info('Checking next Athena query.')
276288

277289
# Task failed, next item
278290

‎lambda/gluerunner/gluerunner-config.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"sfn_activity_arn": "<NO-DEFAULT>",
2+
"sfn_activity_arn": "arn:aws:states:<AWS-REGION>:<AWS-ACCOUNTID>:activity:GlueRunnerActivity",
33
"sfn_worker_name": "gluerunner",
44
"ddb_table": "GlueRunnerActiveJobs",
55
"ddb_query_limit": 50,

‎lambda/gluerunner/gluerunner.py

+76-72
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def load_config():
3434
def is_json(jsonstring):
3535
try:
3636
json_object = json.loads(jsonstring)
37-
except ValueError, e:
37+
except ValueError:
3838
return False
3939
return True
4040

@@ -146,102 +146,106 @@ def check_glue_jobs(config):
146146

147147
# For each item...
148148
for item in ddb_resp['Items']:
149-
150149
glue_job_run_id = item['glue_job_run_id']
151150
glue_job_name = item['glue_job_name']
152151
sfn_task_token = item['sfn_task_token']
153152

154-
logger.debug('Polling Glue job run status..')
155-
156-
# Query glue job status...
157-
glue_resp = glue.get_job_run(
158-
JobName=glue_job_name,
159-
RunId=glue_job_run_id,
160-
PredecessorsIncluded=False
161-
)
162-
163-
job_run_state = glue_resp['JobRun']['JobRunState']
164-
job_run_error_message = glue_resp['JobRun'].get('ErrorMessage', '')
165-
166-
logger.debug('Job with Run Id {} is currently in state "{}"'.format(glue_job_run_id, job_run_state))
153+
try:
167154

168-
# If Glue job completed, return success:
169-
if job_run_state in ['SUCCEEDED']:
155+
logger.debug('Polling Glue job run status..')
170156

171-
logger.info('Job with Run Id {} SUCCEEDED.'.format(glue_job_run_id))
157+
# Query glue job status...
158+
glue_resp = glue.get_job_run(
159+
JobName=glue_job_name,
160+
RunId=glue_job_run_id,
161+
PredecessorsIncluded=False
162+
)
172163

173-
# Build an output dict and format it as JSON
174-
task_output_dict = {
175-
"GlueJobName": glue_job_name,
176-
"GlueJobRunId": glue_job_run_id,
177-
"GlueJobRunState": job_run_state,
178-
"GlueJobStartedOn": glue_resp['JobRun'].get('StartedOn', '').strftime('%x, %-I:%M %p %Z'),
179-
"GlueJobCompletedOn": glue_resp['JobRun'].get('CompletedOn', '').strftime('%x, %-I:%M %p %Z'),
180-
"GlueJobLastModifiedOn": glue_resp['JobRun'].get('LastModifiedOn', '').strftime('%x, %-I:%M %p %Z')
181-
}
164+
job_run_state = glue_resp['JobRun']['JobRunState']
165+
job_run_error_message = glue_resp['JobRun'].get('ErrorMessage', '')
182166

183-
task_output_json = json.dumps(task_output_dict)
167+
logger.debug('Job with Run Id {} is currently in state "{}"'.format(glue_job_run_id, job_run_state))
184168

169+
# If Glue job completed, return success:
170+
if job_run_state in ['SUCCEEDED']:
185171

186-
logger.info('Sending "Task Succeeded" signal to Step Functions..')
187-
sfn_resp = sfn.send_task_success(
188-
taskToken=sfn_task_token,
189-
output=task_output_json
190-
)
172+
logger.info('Job with Run Id {} SUCCEEDED.'.format(glue_job_run_id))
191173

192-
# Delete item
193-
resp = ddb_table.delete_item(
194-
Key={
195-
'sfn_activity_arn': sfn_activity_arn,
196-
'glue_job_run_id': glue_job_run_id
174+
# Build an output dict and format it as JSON
175+
task_output_dict = {
176+
"GlueJobName": glue_job_name,
177+
"GlueJobRunId": glue_job_run_id,
178+
"GlueJobRunState": job_run_state,
179+
"GlueJobStartedOn": glue_resp['JobRun'].get('StartedOn', '').strftime('%x, %-I:%M %p %Z'),
180+
"GlueJobCompletedOn": glue_resp['JobRun'].get('CompletedOn', '').strftime('%x, %-I:%M %p %Z'),
181+
"GlueJobLastModifiedOn": glue_resp['JobRun'].get('LastModifiedOn', '').strftime('%x, %-I:%M %p %Z')
197182
}
198-
)
199183

200-
# Task succeeded, next item
184+
task_output_json = json.dumps(task_output_dict)
201185

202-
elif job_run_state in ['STARTING', 'RUNNING', 'STARTING', 'STOPPING']:
203-
logger.debug('Job with Run Id {} hasn\'t succeeded yet.'.format(glue_job_run_id))
186+
logger.info('Sending "Task Succeeded" signal to Step Functions..')
187+
sfn_resp = sfn.send_task_success(
188+
taskToken=sfn_task_token,
189+
output=task_output_json
190+
)
204191

205-
# Send heartbeat
206-
sfn_resp = sfn.send_task_heartbeat(
207-
taskToken=sfn_task_token
208-
)
192+
# Delete item
193+
resp = ddb_table.delete_item(
194+
Key={
195+
'sfn_activity_arn': sfn_activity_arn,
196+
'glue_job_run_id': glue_job_run_id
197+
}
198+
)
209199

210-
logger.debug('Heartbeat sent to Step Functions.')
200+
# Task succeeded, next item
211201

212-
# Heartbeat sent, next item
202+
elif job_run_state in ['STARTING', 'RUNNING', 'STARTING', 'STOPPING']:
203+
logger.debug('Job with Run Id {} hasn\'t succeeded yet.'.format(glue_job_run_id))
213204

214-
elif job_run_state in ['FAILED', 'STOPPED']:
205+
# Send heartbeat
206+
sfn_resp = sfn.send_task_heartbeat(
207+
taskToken=sfn_task_token
208+
)
215209

216-
message = 'Glue job "{}" run with Run Id "{}" failed. Last state: {}. Error message: {}'\
217-
.format(glue_job_name, glue_job_run_id[:8] + "...", job_run_state, job_run_error_message)
210+
logger.debug('Heartbeat sent to Step Functions.')
218211

219-
logger.error(message)
212+
# Heartbeat sent, next item
220213

221-
message_json={
222-
'glue_job_name': glue_job_name,
223-
'glue_job_run_id': glue_job_run_id,
224-
'glue_job_run_state': job_run_state,
225-
'glue_job_run_error_msg': job_run_error_message
226-
}
214+
elif job_run_state in ['FAILED', 'STOPPED']:
227215

228-
sfn_resp = sfn.send_task_failure(
229-
taskToken=sfn_task_token,
230-
cause=json.dumps(message_json),
231-
error='GlueJobFailedError'
232-
)
216+
message = 'Glue job "{}" run with Run Id "{}" failed. Last state: {}. Error message: {}' \
217+
.format(glue_job_name, glue_job_run_id[:8] + "...", job_run_state, job_run_error_message)
233218

234-
# Delete item
235-
resp = ddb_table.delete_item(
236-
Key={
237-
'sfn_activity_arn': sfn_activity_arn,
238-
'glue_job_run_id': glue_job_run_id
239-
}
240-
)
219+
logger.error(message)
241220

242-
logger.error(message)
221+
message_json = {
222+
'glue_job_name': glue_job_name,
223+
'glue_job_run_id': glue_job_run_id,
224+
'glue_job_run_state': job_run_state,
225+
'glue_job_run_error_msg': job_run_error_message
226+
}
243227

244-
# Task failed, next item
228+
sfn_resp = sfn.send_task_failure(
229+
taskToken=sfn_task_token,
230+
cause=json.dumps(message_json),
231+
error='GlueJobFailedError'
232+
)
233+
234+
# Delete item
235+
resp = ddb_table.delete_item(
236+
Key={
237+
'sfn_activity_arn': sfn_activity_arn,
238+
'glue_job_run_id': glue_job_run_id
239+
}
240+
)
241+
242+
logger.error(message)
243+
# Task failed, next item
244+
except Exception as e:
245+
logger.error('There was a problem checking status of Glue job "{}"..'.format(glue_job_name))
246+
logger.error('Glue job Run Id "{}"'.format(glue_job_run_id))
247+
logger.error('Reason: {}'.format(e.message))
248+
logger.info('Checking next Glue job.')
245249

246250

247251
glue = boto3.client('glue')

‎lambda/ons3objectcreated/ons3objectcreated.py

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import logging, logging.config
77
from botocore.client import Config
88

9-
s3 = boto3.client('s3')
109
# Because Step Functions client uses long polling, read timeout has to be > 60 seconds
1110
sfn_client_config = Config(connect_timeout=50, read_timeout=70)
1211
sfn = boto3.client('stepfunctions', config=sfn_client_config)

‎lambda/s3-deployment-descriptor.json

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
{
22
"athenarunner": {
3-
"SourceS3BucketName": "<NO-DEFAULT>",
4-
"SourceS3Key":"src/athenarunner.zip"
3+
"ArtifactBucketName": "<NO-DEFAULT>",
4+
"LambdaSourceS3Key": "src/athenarunner.zip"
55
},
66
"gluerunner": {
7-
"SourceS3BucketName": "<NO-DEFAULT>",
8-
"SourceS3Key":"src/gluerunner.zip"
7+
"ArtifactBucketName": "<NO-DEFAULT>",
8+
"LambdaSourceS3Key": "src/gluerunner.zip"
99
},
1010
"ons3objectcreated": {
11-
"SourceS3BucketName": "<NO-DEFAULT>",
12-
"SourceS3Key":"src/ons3objectcreated.zip"
11+
"ArtifactBucketName": "<NO-DEFAULT>",
12+
"LambdaSourceS3Key": "src/ons3objectcreated.zip"
1313
}
1414
}

0 commit comments

Comments
 (0)
Please sign in to comment.