In the previous post we built a docker image with time series analysis libraries. In this post we will write the scripts that perform the forecast.
All the code we write in this post will be in the /fargate
sub-directory.
A little bit of python Link to heading
Create a directory called src
. This is where the python source files will be placed.
In the src
directory, create a file called __init__.py
. This file tells python that the directory can be imported like a normal library.
Create a file called index.py
with the following code:
"""
Entry point for forecasting requests
"""
import json
from src.forecast_models.exponential_smoothing_model import ExponentialSmoothingModel
def handler(jsonstring):
"""
Accepts a JSON string payload
Returns a JSON string with forecast predictions
"""
try:
payload = json.loads(jsonstring)
time_series = payload['time_series']
aggregation = payload['aggregation']
prediction = ExponentialSmoothingModel(time_series, aggregation).magic()
return prediction.to_json(orient='records', date_format='None')
except Exception: # pylint: disable=broad-except
return '{"message":"Unknown error ocurred"}'
This file defines a method called handler
which will accept JSON strings with the input data and return a JSON string of the generated forecast.
In index.py
we import src.forecast_models.exponential_smoothing_model
. This is where the magic actually happens, hence the method magic()
being called.
The magic Link to heading
In the src
directory create another directory called forecast_models
.
Create a file called exponential_smoothing_model.py
with the following code:
"""
Exponential smoothing model
Utilizes Holt-Winters model under the hood
"""
import pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing
class ExponentialSmoothingModel:
def __init__(self, json_data, aggregation):
self.json_data = json_data
self.aggregation = aggregation
self.data_frame = pd.DataFrame(json_data)
def magic(self):
train = self.data_frame.loc[:, 'quantity']
model = ExponentialSmoothing(
train, seasonal='mul', seasonal_periods=self.seasonal_period()).fit()
prediction = model.predict(
start=train.index[0], end=self.prediction_end_index())
predicted_df = self.prediction_dataframe()
predicted_df['quantity'] = prediction
predicted_df['date'] = predicted_df['date'].dt.strftime('%Y-%m-%d')
return predicted_df
def seasonal_period(self):
if self.aggregation == 'month':
return 12
elif self.aggregation == 'week':
return 365 / 7
else:
return 365
def prediction_end_index(self):
return len(self.data_frame.index) + self.seasonal_period()
def prediction_dataframe(self):
date_range = pd.date_range(
self.data_frame['date'][0], periods=self.prediction_end_index())
return pd.DataFrame(date_range, columns=['date'])
This file turns JSON data into a Pandas DataFrame
and feeds it into the Holt-Winters model. Said model then returns another DataFrame
with its predictions of the existing data set and a future prediction. For this case we will forecast one year into the future. There is also some logic around aggregation in the data set, but those are implementation details and not really relevant to setting forecasting up on ECS.
The entrypoint Link to heading
Add the following code to a file called ecsforecast
in the root directory of the project:
#!/usr/bin/env python
import os
import urllib.request as req
from src.index import handler
import boto3
def getjson():
json_string = os.getenv('INPUT_JSON_STRING')
if not json_string:
s3_bucket = os.getenv('S3_BUCKET')
key = os.getenv('S3_key')
s3_client = boto3.client('s3')
json_string = s3_client.get_object(Bucket=s3_bucket, Key=key)['Body'].read().decode('utf-8')
return json_string
def create_s3_object(json_string):
s3_bucket = os.getenv('S3_BUCKET')
key = os.getenv('INPUT_JSON_URL').replace('input', 'output')
s3_client = boto3.client('s3')
s3_client.put_object(Body=json_string, Bucket=s3_bucket, Key=key)
return key
json_string = getjson()
prediction = handler(json_string)
key = create_s3_object(prediction)
print(key)
This file will be the entrypoint for calling the forecasting models.
Notice the line os.getnenv('S3_KEY')
, this tells the script to retrieve the JSON file with that key from an S3 bucket with name S3_BUCKET
.
Input format Link to heading
The prediction engine expects the JSON file in S3 to have the following format.
{
"time_series": [
{
"date": "the date of this data point",
"quantity": "the quantity observed on that day"
},
...
],
"aggregation": "day/week/month this allows for seasonality to be factored in"
}
Adding magic to the image Link to heading
Now that we have all the code needed to run the forecasting model, we will add it into the docker image. Append the following lines to the Dockerfile
:
COPY ./src /app/src
COPY ./ecsforecast /app
RUN chmod +x ./ecsforecast
ENTRYPOINT ["/app/ecsforecast"]
Here we are copying the src
directory with all our forecasting code into the container, then copying the ecsforecast
file into the main working directory of the image.
RUN chmod +x ./ecsforecast
gives execute permissions to the file ecsforecast.
And lastly, ENTRYPOINT ["/app/ecsforecast"]
sets the script to run when docker run is called.