Creating a simple Dockerised Flask App

This post covers the steps to create a simple dockerised flask app to cover some of the basic steps required when creating a REST(ish) service that can be run as a Docker container.

The App

Rather than go with the obvious “Hello, World!” type example, I decided I’d try and do something just a touch more interesting and create a REST(ish) resource that will return a response with the status code that was passed in the path of the request. This might be useful for test frameworks where you want to validate some codes reaction to a given response status code or similar.

I’m using Flask to create a quick an dirty solution, mostly to keep it simple. Firstly, the requirements.txt file is simple, just one requirement;



This will get us the Flask package to use in our simple REST(ish) service, which is essentially this; (forgive the inline status_codes dict)

import json
from flask import Flask, Response 

status_codes = {
        "100": "Continue",
        "101": "Switching Protocols",
        "102": "Processing",
        "103": "Early Hints",
        "200": "OK",
        "201": "Created",
        "202": "Accepted",
        "203": "Non-Authoritative Information",
        "204": "No Content",
        "205": "Reset Content",
        "206": "Partial Content",
        "207": "Multi-Status",
        "208": "Already Reported",
        "226": "IM Used",
        "300": "Multiple Choices",
        "301": "Moved Permanently",
        "302": "Found",
        "303": "See Other",
        "304": "Not Modified",
        "305": "Use Proxy",
        "307": "Temporary Redirect",
        "308": "Permanent Redirect",
        "400": "Bad Request",
        "401": "Unauthorized",
        "402": "Payment Required",
        "403": "Forbidden",
        "404": "Not Found",
        "405": "Method Not Allowed",
        "406": "Not Acceptable",
        "407": "Proxy Authentication Required",
        "408": "Request Timeout",
        "409": "Conflict",
        "410": "Gone",
        "411": "Length Required",
        "412": "Precondition Failed",
        "413": "Payload Too Large",
        "414": "URI Too Long",
        "415": "Unsupported Media Type",
        "416": "Range Not Satisfiable",
        "417": "Expectation Failed",
        "421": "Misdirected Request",
        "422": "Unprocessable Entity",
        "423": "Locked",
        "424": "Failed Dependency",
        "425": "Too Early",
        "426": "Upgrade Required",
        "428": "Precondition Required",
        "429": "Too Many Requests",
        "431": "Request Header Fields Too Large",
        "451": "Unavailable For Legal Reasons",
        "500": "Internal Server Error",
        "501": "Not Implemented",
        "502": "Bad Gateway",
        "503": "Service Unavailable",
        "504": "Gateway Timeout",
        "505": "HTTP Version Not Supported",
        "506": "Variant Also Negotiates",
        "507": "Insufficient Storage",
        "508": "Loop Detected",
        "510": "Not Extended",
        "511": "Network Authentication Required"

app = Flask(__name__)

@app.route('/<code>', methods=['GET', 'POST', 'HEAD', 'PUT'])
def status_code(code):
    message = status_codes.get(code, "Unknown Status Code")
    return Response(status=int(code), response=message)

if __name__ == '__main__':, host='')

You can test the code by running python which will launch the app on port 5000. Quick test might be;

curl -v http://localhost:5000/405

All being well, this will give you a response of

*   Trying ::1...
* Connection failed
* connect to ::1 port 80 failed: Connection refused
*   Trying
* Connected to localhost ( port 5000 (#0)
> GET /405 HTTP/1.1
> Host: localhost
> User-Agent: curl/7.54.0
> Accept: */*
* HTTP 1.0, assume close after body
< Content-Type: text/html; charset=utf-8
< Content-Length: 18
< Server: Werkzeug/0.14.1 Python/3.7.2
< Date: Sat, 19 Jan 2019 16:26:34 GMT
* Closing connection 0
Method Not Allowed%

NOTE, the response status is the code that we’ve passed HTTP/1.0 405 METHOD NOT ALLOWED

Running it as a Docker container

Installing Docker

First, you’re going to need to have Docker on your machine. Best approach is going to be downloading the Docker Desktop for your particular machine.

Creating the DockerFile

Dockerfiles require a base image to start from, for a lightweight Python container we can just use the Alpine image to derive our container. This image is a minimal Docker image which is only 5mb in size. You can learn more about Alpine here

The Dockerfile below is all that we’re going to need. It assumes the basic file structure of the project is similar to the tree below;

├── Dockerfile
├── app
│   ├──
│   └──
└── requirements.txt

We’ve covered, requirements.txt and is an empty file. All thats left is the Dockerfile

FROM python:alpine


# Copy over the application
COPY . /app

RUN python3 -m pip install -r requirements.txt

# Start the application
CMD ["python3", "app/"]


Breaking this down we’re saying that our image is

  • going to be based on the python:alpine image.
  • going to expose something on port 5000 (in this case the app)
  • going to use /app as its working directory
  • going to copy the contents of app to the /app folder on the image
  • going to install the requirements as specified in requirements.txt

Finally, we end with the CMD which specifies what will happen when the container starts. In this case, we’re going to be starting the Flask app.

Building the docker file

We need to build the image to be able to use it. This is assuming you’ve installed and started Docker on your machine.

To build the image we used the docker build command.

docker build . -t httpcodes:latest

This will give an output with the steps that are performed while building the image

Sending build context to Docker daemon  11.78kB
Step 1/7 : FROM python:alpine
 ---> 1a8edcb29ce4
Step 2/7 : LABEL Name=docker Version=0.0.1
 ---> Using cache
 ---> 2076201409c8
Step 3/7 : EXPOSE 3000
 ---> Using cache
 ---> 63588eaed844
Step 4/7 : WORKDIR /app
 ---> Using cache
 ---> feb03f342d39
Step 5/7 : ADD . /app
 ---> Using cache
 ---> fe2d365303a5
Step 6/7 : RUN python3 -m pip install -r requirements.txt
 ---> Using cache
 ---> b3ecdb9890ad
Step 7/7 : CMD ["python3", "app/"]
 ---> Using cache
 ---> 1616f252e49d
Successfully built 1616f252e49d
Successfully tagged httpcodes:latest

We can now run the image

docker run -d -p 80:5000 httpcodes

This command is telling Docker to start a container based on the httpcodes (infering latest because no version was specified) and to do a port forward from the host (your machine) to port 5000 on the container. In this case, we’re saying route all traffic that comes to http://localhost:80 to 5000 on the container.

Testing the endpoint

As before, we can test the endpoint to make sure it does as we expected.

curl -v http://localhost/405

All being well, this will give you a response of

*   Trying ::1...
* Connection failed
* connect to ::1 port 80 failed: Connection refused
*   Trying
* Connected to localhost ( port 80 (#0)
> GET /405 HTTP/1.1
> Host: localhost
> User-Agent: curl/7.54.0
> Accept: */*
* HTTP 1.0, assume close after body
< Content-Type: text/html; charset=utf-8
< Content-Length: 18
< Server: Werkzeug/0.14.1 Python/3.7.2
< Date: Sat, 19 Jan 2019 16:26:34 GMT
* Closing connection 0
Method Not Allowed%

Adding retry logic to urllib3 Python code

In this post I’m going to cover the basics of implementing retry logic using urllib3.

There is probably a solid argument saying “why aren’t you just using requests?”, as it happens, requests uses urllib3 and it’s Retry functionality.

For the purposes of this post, lets imagine that we have a REST service and one of the resources is particularly popular, or flakey, and is throwing the occasional 503 HTTP Code.

Our initial code might look something like;

import urllib3

http = urllib3.PoolManager()
r = http.request('GET', '')
if r.status == 200:'That was lucky')

We have once chance to get it right. Yes, some convoluted while loop against the status code could be used, but thats ugly.

Another option available to us is to make use of urllib3.util.Retry and get our request to retry a specified amount of times.

import urllib3
from urllib3.util import Retry
from urllib3.exceptions import MaxRetryError

http = urllib3.PoolManager()
retry = Retry(3, raise_on_status=True, status_forcelist=range(500, 600))

    r = http.request('GET', '', retries=retry)
except MaxRetryError as m_err:
    logger.error('Failed due to {}'.format(m_err.reason))

In this code we’ve created a Retry object telling it to retry a total of 3 times and throw an exception if all retries are exhausted. The status_forcelist is the HTTP status codes that will be considered to be failures.

Some other interesting arguments for the Retry object are.

totalThe total number of retries that are allowed. Trumps the combined figure of connect and read
readHow many read retries that are allowed
connectHow many connect errors that are allowed
redirectHow many redirects to allow. This is handy to prevent redirect loops
method_whitelistWhich pethos are allowed. By default only idempotent methods are allowed, ruling out POST
backoff_factorHow much to increase the back off factor (see docs for more info)
raise_on_statusWhether to return the failed status or raise an exception

For more information, see the urllib3 documentation

Refreshing AWS credentials in Python

In a recent post I covered an using RefreshingAWSCredentials within .NET AWS SDK to solve an issue with the way my current organisation has configured SingleSignOn (SSO) and temporary credentials.

Essentially, the solution involves a background process updating a credenial file then using a time limited AWSCredential object to refresh the credentials.


The next issue to surface was satisfying the same requirement but for the Python based component of the 3rd party solution.

Refreshing Credential File

In this case, on RedHat instance, there is a cron job executing a Python script which handles the SSO process and writes the updated credentials and session token to a file which can be used by the 3rd party component.

Refreshing the Credentials in code

The exising code creates a session then creates the required resources. This works fine for the first hour till the temporary credentials expire.

from botocore.session import get_session

queues['incoming'] = session.resource('sqs', region).get_queue_by_name(QueueName='incoming_queue')

There is only a small amount of work to make this refreshing against the externally updated credential file. For this we’ll make use of the RefreshableCredentials from botocore.credentials.

from botocore.credentials import RefreshableCredentials
from botocore.session import get_session
from configparser import ConfigParser
from datetime import datetime, timedelta, timezone

def refresh_external_credentials():
    config = ConfigParser()
    profile = config.get(profile_name)
    expiry = ( + timedelta(minutes=refresh_minutes))
    return {
        "access_key": profile.get('aws_access_key_id'),
        "secret_key": profile.get('aws_secret_access_key'),
        "token": profile.get('aws_session_token'),
        "expiry_time": expiry.isoformat()

There are a few config entries here.

  • credential_file_path is the location of the credential file that is getting externally updated
  • profile_name is the profile in the credential file that you want to use
  • refresh_minutes is the time before the AWS credential will expire and the refresh_external_credentials() function will get called.

We now need to create the credential object for a session which will then be able to auto refresh.

session_credentials = RefreshableCredentials.create_from_metadata(
    metadata = refresh_external_credentials(),
    refresh_using = refresh_external_credentials,
    method = 'sts-assume-role'

Going back to the original code, the new session_credentials can be plugged in to provide long life application against temporary tokens.

import boto3

# ideally taken from config
region = 'eu-west-1'
incoming_queue_name = 'incoming_queue'

session = get_session()
session._credentials = session_credentials
autorefresh_session = boto3.Session(botocore_session=session)

queues['incoming'] = autorefresh_session.resource('sqs', region).get_queue_by_name(QueueName=incoming_queue_name)

RefreshingAWSCredentials with .NET

Where I am currently working we have Single Sign On for AWS API calls and need to use task accounts to connect and get temporary credentials. To that end, its not very easy to have long running processes making calls to AWS API’s such as S3 and SQS.

I am working a proof of concept which has a 3rd party .NET component which listens to SQS messages, calls into a proprietary API then dumps the results on S3. This code wasn’t written by people who knew about the hoops we need to authenticate.

To complicate things, the context the application runs under isn’t the context of the service account which has been granted rights to the appropriate role and the instance profile doesn’t have the correct rights. (For reasons I won’t go into, the Ops team won’t correct this).

So, having written that, I realise we’re looking at a very niche use case, but if it looks familiar, read on.


Behind the scenes, there is a PowerShell in the background running as a scheduled task to go through Single Sign On and get new tokens to write in the credential file. I won’t go into any more detail than that as its very company specific.

As the 3rd party application creates the client on startup it uses the latest credentials but they don’t get refreshed from the credential file. I found an abstract class in the .NET SDK called RefreshingAWSCredentials which looked promising.

With this class, you can set an expiration for the Credential object such that any AWS SDK client that is using it for the API calls - for example;

var s3Client = new AmazonS3Client(ExternalRefreshingAWSCredentilas.Credentials);

will create an S3Client that is given the refreshing credentials specified below.

using Amazon.Runtime;
using Amazon.Runtime.CredentialManagement;
using log4net;
using System;
using System.Configuration;
namespace AwsCredentialsExample.Credentials
    public class ExternalRefreshingAWSCredentials : RefreshingAWSCredentials
        private static readonly object lockObj = new Object();
        private static readonly ILog Logger = LogManager.GetLogger(typeof(ExternalRefreshingAWSCredentials));
        private readonly string credentialFileProfile;
        private readonly string credentialFileLocation;
        private static ExternalRefreshingAWSCredentials refreshingCredentials;
        private CredentialsRefreshState credentialRefreshState;
        private int refreshMinutes = 45;

        public static ExternalRefreshingAWSCredentials Credentials {
            get {
                if (refreshingCredentials == null) {
                    lock (lockObj) {
                        if (refreshingCredentials == null) {
                            refreshingCredentials = new ExternalRefreshingAWSCredentials();
                return refreshingCredentials;

        private ExternalRefreshingAWSCredentials()
             credentialFileProfile = ConfigurationManager.AppSettings["CredentialFileProfile"];
             credentialFileLocation = ConfigurationManager.AppSettings["CredentialFileLocation"];
             if (ConfigurationManager.AppSettings.HasKey("ClientRefreshIntervalMinutes"))
                 refreshMinutes = int.Parse(ConfigurationManager.AppSettings.Get("ClientRefreshIntervalMinutes"));
             Logger.Info(string.Format("Credential file location is {0}", credentialFileLocation));
              Logger.Info(string.Format("Credential file profile is {0}", credentialFileProfile));
            credentialRefreshState = GenerateNewCredentials();
        public override void ClearCredentials()
            Logger.Info("Clearing the credentials");
            credentialRefreshState = null;
        protected override CredentialsRefreshState GenerateNewCredentials()
            Logger.Info(string.Format("Generating credentials, valid for {0} minutes", refreshMinutes));
            var credFile = new StoredProfileAWSCredentials(credentialFileProfile, credentialFileLocation);
            return new CredentialsRefreshState(credFile.GetCredentials(), DateTime.Now.AddMinutes(refreshMinutes));
        public override ImmutableCredentials GetCredentials()
            if (credentialRefreshState == null || credentialRefreshState.Expiration < DateTime.Now)
                credentialRefreshState = GenerateNewCredentials();
            return credentialRefreshState.Credentials;


There are three configurations to be used with this. These should be added as AppSettings in the app.config

  1. CredentialFileLocation - The location of the credential file that is being updated externally (in this case by PowerShell)
  2. CredentialFileProfile - The profile from the credential file to use
  3. ClientRefreshIntervalMinutes - How long to keep the credentials before expiring them (defaults to 45 minutes)

As suggested before, you can now create your AWS clients passing in the Credentials property in place of any of the normally used Credentials objects.

var s3Client = new AmazonS3Client(ExternalRefreshingAWSCredentials.Credentials);

Generating test data with Faker

Python is one of those languages where if you can concieve it, there is probably already a library for it.

One great library is Faker - this makes the generation of sensible test data much easier and removes a lot of the issues around having to using unrealistic junk values when creating it on your own.

There is lots to see, and your probably best off reading the docs, but this is to give you an overview.


Installation is simple, just use pip to install;

pip install faker


Now that you have it installed, you can use python REPL or ptpython to have a play.

### Some basics

from faker import Factory

fake = Factory.create()
fake.first_name(), fake.last_name(),

This will give you a tuple with a random name and email;

('Mary', 'Bennett', '')


If you want to get UK post codes, you can tell the factory a localisation to use when generating the data;

from faker import Factory

fake = Factory.create('en_GB')

fake.street_address(), fake.postcode()

which will yield;

('Studio 54\nCollins fork', 'L2 7XP')

Synchronising Multiple Fakes

Everytime you call a method on the fake object you get a new value. If you wanted to synchronise two fake objects you can use the seed. This will mean that the each consecutive call from each fake will return the same value.

This is probably more easily demonstrated in code;

from faker import Factory

fake1 = Factory.create()
fake2 = Factory.create()


This will result in a tuple containing the same name across synchronised fake objects.

('Adam Bryan', 'Adam Bryan')
('Jacob Lee', 'Jacob Lee')

Making it a bit more interesting

In a previous pose I fiddled with credit card data where I created test data. Faker can be used to help out here. The code below isn’t an example of amazing Python, its just simple code to show it working.

First, we bring in the imports that are going to be used;

import csv
import random
from faker import Factory
from faker.providers import credit_card

Some helper methods, these are just to keep things clean

def get_transaction_amount():
    return round(random.randint(1, 1000) * random.random(), 2)

def get_transaction_date(fake):
    return fake.date_time_between(start_date='-30y', end_date='now').isoformat()

Some more helpers for the creation of records for our customer and transaction

def create_customer_record(customer_id):
    fake = Factory.create('en_GB')
    return [customer_id, fake.first_name(), fake.last_name(), fake.street_address().replace('\n', ', '),, fake.postcode(),]

def create_financials_record(customer_id):
    fake = Factory.create('en_GB')
    return [customer_id, fake.credit_card_number(), get_transaction_amount(), get_transaction_date(fake)]

A helper function to save the records to file

def flush_records(records, filename):
    with open(filename, 'a') as file:
        csv_writer=csv.writer(file, delimiter = ',', quotechar = '"', quoting = csv.QUOTE_MINIMAL)
        for record in records:

Finally the main calling block to create the records

def create_customer_files(customer_count=100):
    customer_records = []
    financial_records = []
    for id in range(1, customer_count):
        customer_id = str(id).zfill(10)
        if len(customer_records) == 100:
            flush_records(customer_records, 'customer.csv')
            flush_records(financial_records, 'financials.csv')
    flush_records(customer_records, 'customer.csv')
    flush_records(financial_records, 'financials.csv')


Once we run this, we’ll have 2 files with customer details and a credit card transaction.

Customer records

0000000001,Clifford,Turner,"Flat 17, Smith crescent",Johnsonborough,DN5 7JJ,
0000000002,Amy,Clements,"Studio 96s, Anne harbor",Maureenfurt,LA53 8HZ,
0000000003,Robin,Sinclair,5 Lesley motorway,Bryanbury,E2 9EU,

Financial records


By sharing the customer ID across both files we have some semblence of referential integrity.

This code only creates a single transaction per customer, it can be easily modified to create multiple transactions by adjusting the create_financial_records to take an optional argument of transaction_count=1 and updating the append to handle an array of arrays ```