In the previous post we built a docker image with time series analysis libraries. In this post we will write the scripts that perform the forecast.

All the code we write in this post will be in the /fargate sub-directory.

A little bit of python Link to heading

Create a directory called src. This is where the python source files will be placed.

In the src directory, create a file called __init__.py. This file tells python that the directory can be imported like a normal library.

Create a file called index.py with the following code:

"""
Entry point for forecasting requests
"""

import json

from src.forecast_models.exponential_smoothing_model import ExponentialSmoothingModel


def handler(jsonstring):
    """
    Accepts a JSON string payload
    Returns a JSON string with forecast predictions
    """
    try:
        payload = json.loads(jsonstring)
        time_series = payload['time_series']
        aggregation = payload['aggregation']
        prediction = ExponentialSmoothingModel(time_series, aggregation).magic()
        return prediction.to_json(orient='records', date_format='None')
    except Exception: # pylint: disable=broad-except
        return '{"message":"Unknown error ocurred"}'

This file defines a method called handler which will accept JSON strings with the input data and return a JSON string of the generated forecast.

In index.py we import src.forecast_models.exponential_smoothing_model. This is where the magic actually happens, hence the method magic() being called.

The magic Link to heading

In the src directory create another directory called forecast_models.

Create a file called exponential_smoothing_model.py with the following code:

"""
Exponential smoothing model
Utilizes Holt-Winters model under the hood
"""

import pandas as pd

from statsmodels.tsa.holtwinters import ExponentialSmoothing


class ExponentialSmoothingModel:
    def __init__(self, json_data, aggregation):
        self.json_data = json_data
        self.aggregation = aggregation
        self.data_frame = pd.DataFrame(json_data)

    def magic(self):
        train = self.data_frame.loc[:, 'quantity']
        model = ExponentialSmoothing(
            train, seasonal='mul', seasonal_periods=self.seasonal_period()).fit()
        prediction = model.predict(
            start=train.index[0], end=self.prediction_end_index())
        predicted_df = self.prediction_dataframe()
        predicted_df['quantity'] = prediction
        predicted_df['date'] = predicted_df['date'].dt.strftime('%Y-%m-%d')
        return predicted_df

    def seasonal_period(self):
        if self.aggregation == 'month':
            return 12
        elif self.aggregation == 'week':
            return 365 / 7
        else:
            return 365

    def prediction_end_index(self):
        return len(self.data_frame.index) + self.seasonal_period()

    def prediction_dataframe(self):
        date_range = pd.date_range(
            self.data_frame['date'][0], periods=self.prediction_end_index())
        return pd.DataFrame(date_range, columns=['date'])

This file turns JSON data into a Pandas DataFrame and feeds it into the Holt-Winters model. Said model then returns another DataFrame with its predictions of the existing data set and a future prediction. For this case we will forecast one year into the future. There is also some logic around aggregation in the data set, but those are implementation details and not really relevant to setting forecasting up on ECS.

The entrypoint Link to heading

Add the following code to a file called ecsforecast in the root directory of the project:

#!/usr/bin/env python

import os
import urllib.request as req
from src.index import handler
import boto3


def getjson():
    json_string = os.getenv('INPUT_JSON_STRING')
    if not json_string:
        s3_bucket = os.getenv('S3_BUCKET')
        key = os.getenv('S3_key')
        s3_client = boto3.client('s3')
        json_string = s3_client.get_object(Bucket=s3_bucket, Key=key)['Body'].read().decode('utf-8')
    return json_string

def create_s3_object(json_string):
    s3_bucket = os.getenv('S3_BUCKET')
    key = os.getenv('INPUT_JSON_URL').replace('input', 'output')
    s3_client = boto3.client('s3')
    s3_client.put_object(Body=json_string, Bucket=s3_bucket, Key=key)
    return key


json_string = getjson()
prediction = handler(json_string)
key = create_s3_object(prediction)
print(key)

This file will be the entrypoint for calling the forecasting models.

Notice the line os.getnenv('S3_KEY'), this tells the script to retrieve the JSON file with that key from an S3 bucket with name S3_BUCKET.

Input format Link to heading

The prediction engine expects the JSON file in S3 to have the following format.

{
	"time_series": [
		{
			"date": "the date of this data point",
			"quantity": "the quantity observed on that day"
		},
		...
	],
	"aggregation": "day/week/month this allows for seasonality to be factored in"
}

Adding magic to the image Link to heading

Now that we have all the code needed to run the forecasting model, we will add it into the docker image. Append the following lines to the Dockerfile:

COPY ./src /app/src
COPY ./ecsforecast /app
RUN chmod +x ./ecsforecast

ENTRYPOINT ["/app/ecsforecast"]

Here we are copying the src directory with all our forecasting code into the container, then copying the ecsforecast file into the main working directory of the image.

RUN chmod +x ./ecsforecast gives execute permissions to the file ecsforecast.

And lastly, ENTRYPOINT ["/app/ecsforecast"] sets the script to run when docker run is called.