In Part 3, AWS Fargate was set up and running. Now we will setup the serverless Lambda application to run the forecasting tasks.

All the code in this section will be in the lambda sub-directory, unless explicitly stated otherwise.

Installing Serverless Link to heading

You will need to have Node.js and npm installed locally.

In your terminal go ahead and install serverless.

npm install -g serverless

Create the serverless project in the lambda/ directory.

serverless create \
  --template aws-python3 \
  --name ecsforecast-lambda

This will create a serverless.yml file that has the configurations for the lambda and an handler.py file that has the python code for the lambda.

Open serverless.yml and un-comment the iamRoleStatements section.

Add the following code to the section:

- Effect: "Allow"
  Action:
    - "iam:PassRole"
  Resource:
    Fn::Join:
      - ''
      - - 'arn:aws:iam::'
        - Ref: 'AWS::AccountId'
        - ':role/ecsTaskExecutionRole'
- Effect: "Allow"
  Action:
    - "s3:PutObject"
    - "s3:GetObject"
  Resource:
    Fn::Join:
      - ''
      - - 'arn:aws:s3:::'
        - ${self:custom.s3_bucket}
        - '/*'
- Effect: "Allow"
  Action:
    - "s3:ListBucket"
  Resource:
    Fn::Join:
      - ''
      - - 'arn:aws:s3:::'
        - ${self:custom.s3_bucket}

Add in the custom values in the root of the file:

custom:
  defaultStage: dev
  currentStage: ${opt:stage, self:custom.defaultStage}
  s3_bucket: ${file(./config.${self:custom.currentStage}.json):s3Bucket}
  vpc_subnet: ${file(./config.${self:custom.currentStage}.json):vpcSubnet}
  cluster_name: ${file(./config.${self:custom.currentStage}.json):clusterName}
  task_definitioin: ${file(./config.${self:custom.currentStage}.json):taskDefinition}

Add the following line after runtime in the provider node:

stage: ${self:custom.currentStage}

Under the provider node, add:

iamManagedPolicies:
    - arn:aws:iam::aws:policy/AmazonECS_FullAccess

The functions node should already be populated with the hello function.

Rename it to ecsforecast. Also rename the function in handler.py to queue_forecast. Your functions node in serverless.yml should look like this

functions:
  ecsforecast:
    handler: handler.queue_forecast
    environment:
      RUNNER: ${self:custom.task_definition}
      SUBNET: ${self:custom.vpc_subnet}
      S3_BUCKET: ${self:custom.s3_bucket}
      CLUSTER_NAME: ${self:custom.cluster_name}
    events:
      - http:
          path: /ecsforecast
          method: post

Lastly install the serverless-python-requirements plugin.

serverless plugin install --name serverless-python-requirements

Create a file called config.dev.json with configuration for your ECS tasks and S3 buckets.

{
    "s3Bucket": <name of S3 bucket created>,
    "vpcSubnet": <pick a subnet from the cluster, one of the subnets when you run tasks>,
    "clusterName": <name of the cluster created>,
    "taskDefinition": <name of the task definition created>
}

The serverless settings are done, now we need to add the AWS python SDK.

Create a file called requirements.txt with one line boto3 in it. boto3 is the AWS python SDK.

Open handler.py and change the function name to queue_forecast with the following body:

def queue_forecast(event, context):
    """
    Handler function to add a ecsforecast task
    """
    try:
        payload = event['body']
        request_id = vars(context)['aws_request_id']
        runner = os.getenv('RUNNER')
        subnet = os.getenv('SUBNET')
        s3_bucket = os.getenv('S3_BUCKET')
        cluster = os.getenv('CLUSTER_NAME')

        now = datetime.datetime.now()
        expires = now + datetime.timedelta(hours=2)


        s3_client = boto3.client('s3')
        s3_client.put_object(Body=payload, Expires=expires, Bucket=s3_bucket, Key=request_id + '/input.json')

        ecs_client = boto3.client('ecs')
        ecs_client.run_task(
            cluster=cluster,
            taskDefinition=runner,
            launchType='FARGATE',
            networkConfiguration={
                'awsvpcConfiguration': {
                    'subnets': [subnet],
                    'assignPublicIp': 'ENABLED'
                }
            },
            overrides={
                'containerOverrides': [
                    {
                        'environment': [
                            {
                                'name': 'INPUT_JSON_URL',
                                'value': request_id + '/input.json'
                            },
                            {
                                'name': 'S3_BUCKET',
                                'value': s3_bucket
                            }
                        ],
                        'name': runner
                    }
                ]
            }
        )

        
        return {
            'statusCode': 200,
            'headers': {'Content-Type': 'application/json'},
            'body': '{"message":"Task queued"}'
        }
    except Exception as err: # pylint: disable=broad-except
        print(str(err))
        return {
            'statusCode': 500,
            'headers': {'Content-Type': 'application/json'},
            'body': '{"message":"Internal server error"}'
        }

Add imports before the function definition:

import datetime
import os
import boto3

This handler file will take the body of a https request and create an S3 file called input.json with it. This file will be put in S3 with the key <request_id>/input.json.

The handler then calls run_task on the task definition and returns Task queued if it did not encounter any errors. The Fargate cluster will now run the forecast function and place the output.json file in the same directory as input.json.

Deploy Link to heading

Serverless allows you to very easily deploy the lambda.

Run the following commands in the lambda/ directory.

serverless package
serverless deploy

Once the deployment is complete. You will be able to send a POST request to the lambda with the time series data in the request body.

{
  "time_series": [
    {
      "date": "date",
      "quantity": some number
    }
  ],
  "aggregation": day/week/month
}

Once the Fargate task finishes, the output of the forecast function will be in S3.