In Part 3, AWS Fargate was set up and running. Now we will setup the serverless Lambda application to run the forecasting tasks.
All the code in this section will be in the lambda
sub-directory, unless explicitly stated otherwise.
Installing Serverless Link to heading
You will need to have Node.js and npm installed locally.
In your terminal go ahead and install serverless
.
npm install -g serverless
Create the serverless project in the lambda/
directory.
serverless create \
--template aws-python3 \
--name ecsforecast-lambda
This will create a serverless.yml
file that has the configurations for the lambda and an handler.py
file that has the python code for the lambda.
Open serverless.yml
and un-comment the iamRoleStatements
section.
Add the following code to the section:
- Effect: "Allow"
Action:
- "iam:PassRole"
Resource:
Fn::Join:
- ''
- - 'arn:aws:iam::'
- Ref: 'AWS::AccountId'
- ':role/ecsTaskExecutionRole'
- Effect: "Allow"
Action:
- "s3:PutObject"
- "s3:GetObject"
Resource:
Fn::Join:
- ''
- - 'arn:aws:s3:::'
- ${self:custom.s3_bucket}
- '/*'
- Effect: "Allow"
Action:
- "s3:ListBucket"
Resource:
Fn::Join:
- ''
- - 'arn:aws:s3:::'
- ${self:custom.s3_bucket}
Add in the custom values in the root of the file:
custom:
defaultStage: dev
currentStage: ${opt:stage, self:custom.defaultStage}
s3_bucket: ${file(./config.${self:custom.currentStage}.json):s3Bucket}
vpc_subnet: ${file(./config.${self:custom.currentStage}.json):vpcSubnet}
cluster_name: ${file(./config.${self:custom.currentStage}.json):clusterName}
task_definitioin: ${file(./config.${self:custom.currentStage}.json):taskDefinition}
Add the following line after runtime
in the provider
node:
stage: ${self:custom.currentStage}
Under the provider
node, add:
iamManagedPolicies:
- arn:aws:iam::aws:policy/AmazonECS_FullAccess
The functions
node should already be populated with the hello function.
Rename it to ecsforecast
. Also rename the function in handler.py
to queue_forecast
. Your functions
node in serverless.yml
should look like this
functions:
ecsforecast:
handler: handler.queue_forecast
environment:
RUNNER: ${self:custom.task_definition}
SUBNET: ${self:custom.vpc_subnet}
S3_BUCKET: ${self:custom.s3_bucket}
CLUSTER_NAME: ${self:custom.cluster_name}
events:
- http:
path: /ecsforecast
method: post
Lastly install the serverless-python-requirements
plugin.
serverless plugin install --name serverless-python-requirements
Create a file called config.dev.json
with configuration for your ECS tasks and S3 buckets.
{
"s3Bucket": <name of S3 bucket created>,
"vpcSubnet": <pick a subnet from the cluster, one of the subnets when you run tasks>,
"clusterName": <name of the cluster created>,
"taskDefinition": <name of the task definition created>
}
The serverless settings are done, now we need to add the AWS python SDK.
Create a file called requirements.txt
with one line boto3
in it. boto3
is the AWS python SDK.
Open handler.py
and change the function name to queue_forecast
with the following body:
def queue_forecast(event, context):
"""
Handler function to add a ecsforecast task
"""
try:
payload = event['body']
request_id = vars(context)['aws_request_id']
runner = os.getenv('RUNNER')
subnet = os.getenv('SUBNET')
s3_bucket = os.getenv('S3_BUCKET')
cluster = os.getenv('CLUSTER_NAME')
now = datetime.datetime.now()
expires = now + datetime.timedelta(hours=2)
s3_client = boto3.client('s3')
s3_client.put_object(Body=payload, Expires=expires, Bucket=s3_bucket, Key=request_id + '/input.json')
ecs_client = boto3.client('ecs')
ecs_client.run_task(
cluster=cluster,
taskDefinition=runner,
launchType='FARGATE',
networkConfiguration={
'awsvpcConfiguration': {
'subnets': [subnet],
'assignPublicIp': 'ENABLED'
}
},
overrides={
'containerOverrides': [
{
'environment': [
{
'name': 'INPUT_JSON_URL',
'value': request_id + '/input.json'
},
{
'name': 'S3_BUCKET',
'value': s3_bucket
}
],
'name': runner
}
]
}
)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': '{"message":"Task queued"}'
}
except Exception as err: # pylint: disable=broad-except
print(str(err))
return {
'statusCode': 500,
'headers': {'Content-Type': 'application/json'},
'body': '{"message":"Internal server error"}'
}
Add imports before the function definition:
import datetime
import os
import boto3
This handler file will take the body of a https
request and create an S3 file called input.json
with it. This file will be put in S3 with the key <request_id>/input.json
.
The handler then calls run_task
on the task definition and returns Task queued
if it did not encounter any errors. The Fargate cluster will now run the forecast function and place the output.json
file in the same directory as input.json
.
Deploy Link to heading
Serverless allows you to very easily deploy the lambda.
Run the following commands in the lambda/
directory.
serverless package
serverless deploy
Once the deployment is complete. You will be able to send a POST request to the lambda with the time series data in the request body.
{
"time_series": [
{
"date": "date",
"quantity": some number
}
],
"aggregation": day/week/month
}
Once the Fargate task finishes, the output of the forecast function will be in S3.