TFL Cycling DataSet - Part 2

Following on from part 1 of this mini series. I’ve got my local environment ready to go and I have pulled down some test data to work with.

The next step is to start having a look at some of the data.

Loading in the data

We know that the data is in csv format so can use the Spark read functionality to bring it in. With the single file in this local environment it’s a case of;

data = spark.read.csv('01aJourneyDataExtract10Jan16-23Jan16.csv', header=True, inferSchema=True)
for col in data.columns:
    data = data.withColumnRenamed( col, col.replace(" ", ""))

This line will create a DataFrame called data and load the csv input into it. By setting header to True we are saying that the first row of the data is a header row.

inferSchema will ask Spark to have a go at working out the correct types for the columns that are being brought in.

Quick Cleanup

Even though inferSchema was used, if we call data.describe() we can see that the type of the dates is string. I’m going to put that down to the fact that these dates are in UK format.

DataFrame[summary: string
, RentalId: string
, Duration: string
, BikeId: string
, EndDate: string
, EndStationId: string
, EndStationName: string
, StartDate: string
, StartStationId: string
, StartStationName: string]

I think I’m going to want these to be dates later on, so I’m going to convert them to timestamps now.

from pyspark.sql.functions import unix_timestamp
dated_data = data.select('RentalId' \
           ,unix_timestamp('StartDate', 'dd/MM/yyyy HH:mm').cast("double").cast("timestamp").alias('StartDate') \
           ,unix_timestamp('EndDate', 'dd/MM/yyyy HH:mm').cast("double").cast("timestamp").alias('EndDate') \
           ,'Duration' \
           ,'StartStationId' \
           ,'EndStationId')

This block uses the unix_timestamp function to get the long number representation of the date which we can then turn into the timstamp type. By passing the format of the date we can solve the issue of it being in a format that the inferSchema wasn’t expecting. I’ve used .alias() to specify the name of the derived column.

Getting the StationId Data

There is an API which we can use to get the additional data for the StartStation_Id and EndStation_Id. This can be found here on the TfL website.

We need a list of all the start and end bike point/stations that are in the dataset so I went for doing a union to get this.

stationids = sorted((data.select('StartStationId') \
                    .union(data.select('EndStationId'))) \
                    .rdd.map(lambda r: r[0]) \
                    .distinct() \
                    .collect())

This will return us a sorted list of all the Ids in the dataset which can be passed into a helper method which will call into the API mentioned about.

def get_bike_points(points_ids):
    bike_point_file = '~/datasets/cycling/bike_points.csv'
    base_url = 'https://api.tfl.gov.uk/BikePoint/BikePoints_'

    with open(bike_point_file, 'w') as csvfile:
        fieldnames = ['pointId', 'commonName', 'lat', 'lon', 'placeType', 'properties']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for point in points_ids:
            if point == None:
                continue
            url = "%s%s" % (base_url, point)
            req = requests.get(url)
            if req.status_code != 200:
                writer.writerow({"pointId": point, "commonName": "Not Found"})
            else:
                bike_point = json.loads(req.text)
                props = {}
                if bike_point.has_key('additionalProperties'):
                    for p in bike_point['additionalProperties']:
                        props[p['key']] = p['value']
                writer.writerow({"pointId": point, "commonName": bike_point['commonName'], "lat": bike_point['lat'], \
                                "lon": bike_point['lon'], "placeType": bike_point['placeType'], 'properties': props})
        csvfile.flush

This block takes the list of Id and collects the data for the bike station, extracts what is wanted from the returned dataset then saves it into a csv file.

Cleaning up the StationId data

Some of the stations in the dataset aren’t there anymore so we get a 404 when we hit the page. To get round this I’ve just created a line for the ID with a common name of not found.

That said, we do have this information in the original data set, so a bit of fiddling can be used to update the bike_points data with the correct commonName.

bike_points = spark.read.csv('bike_points.csv', header=True, inferSchema=True)

combined_bike_points = bike_points.where(bike_points.commonName == "Not Found") \
                      .join(data, data.StartStationId == bike_points.pointId)\
                      .select(bike_points.pointId \
                            , data.StartStationName.alias("commonName") \
                            , bike_points.lat \
                            , bike_points.lon \
                            , bike_points.placeType \
                            , bike_points.properties) \
                      .distinct()

bike_points = combined_bike_points \
              .union(bike_points \
                     .where(bike_points.commonName <> "Not Found"))

Okay, long winded but we now have the station data to work with too.

TFL Cycling DataSet - Part 1

I’m hoping this will be a reasonably accurate account of my play with the TfL Cycling DataSets.

I’m still forming my plan, however loosely I think I want to end up with a visualisation where the bike points are highlighted in over a time series as bikes are taken and returned.

Initially, I’m working on my Mac, but I have a Databricks community cluster that I’ve migrated some of the parts to.

Preparing my Local Env

As I said, I’m using my MacBook so I’m going to install a couple of things

Install Spark

To install spark, I use brew

brew install spark

Install Jupyter

Installing jupyter notebooks is done with pip

pip install jupyter

Getting some data

I took a single file from the S3 bucket to play with locally, for no particular reason I went with 01aJourneyDataExtract10Jan16-23Jan16.csv

aws s3 cp s3://cycling.data.tfl.gov.uk/usage-stats/01aJourneyDataExtract10Jan16-23Jan16.csv ~/datasets/cycling/.

Starting Up

Run the following commands to get your Jupyter Notebook up and running

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
pyspark

Quick Test

Finally a quick test to see how it looks. In the Jupyter notebook I can do

data = spark.read.csv('~/datasets/cycling/01aJourneyDataExtract10Jan16-23Jan16.csv', header=True, inferSchema=True)
data.show()

This should show you 20 rows from the data set and we’re off.

CSV to Markdown table - Sublime Package

Now I’m writing almost all documentation in markdown then using Pandoc to convert it to Mediawiki or docx as required, I needed to finds an easier way to quickly create my tables.

It doesn’t do anything fancy, but I created a sublime package to do the conversion of a csv formatted table into a markdown table.

Assumptions

The following assumptions are made about the csv

  • You’ve got headers in the first row
  • Any empty cells are correctly formatted with commas
  • You don’t have any commas in the values

Creating the Plugin

Creating a new Plugin with Sublime Text 3 is a case of Tools -> Developer -> New Plugin

This will create a new templated file in the User section.

import sublime
import sublime_plugin


class CsvToMdCommand(sublime_plugin.TextCommand):

	content = ""

	def run(self, edit):
		for region in self.view.sel():
			if not region.empty():
					s = self.view.substr(region)
					self.process(s)
					self.view.replace(edit, region, self.content)
					self.content = ""

	def process_row(self, row, isHeader = False):
		self.content += ('|' + row.replace(',', '|') + '|' + '\n')
		if isHeader:
			self.content +=  '|' + ('-|' * (row.count(',') + 1) + '\n')


	def process(self, rows):
	    first = True
	    for row in rows.split("\n"):
	        self.process_row(row.strip(), first)
	        if first:
	            first = False

To add the Command Palette command, use a file with the extension .sublime-commands in the Packages/User folder

[
	{"caption" : "CSV to MD: Convert", "command" : "csv_to_md"}
]

Go routines and channels

I’m having a bit of a dabble with Go, as a by product of working with Elastic Search beats.

One thing I’ve been looking at today is the channels to allow two go routines to communicate with each other and I came up with a fairly cheesy way to play with implementing them.

package main

import "fmt"

func ping(c chan string) {
	for {
		msg := <-c
		if msg == "pong" {
			println(" .... " + msg)
			c <- "ping"
		}
	}
}

func pong(c chan string) {
	for {
		msg := <-c
		if msg == "ping" {
			print(msg)
			c <- "pong"
		}
	}
}

func main() {
	var c chan string = make(chan string)

	go ping(c)
	go pong(c)

	c <- "ping"
	var input string
	fmt.Scanln(&input)
}

Using the go keyword to essentially start the ping and pong in the logical processor and run concurrently. They both get passed a special chan string that acts as a shared channel for them to synchronise against.

The code results in an endless game of ping pong.

ping .... pong
ping .... pong
ping .... pong
ping .... pong
ping .... pong
ping .... pong

There you go. Almost certainly not the best Go ever written, but a start.

Monit

There are lots of monitoring and alerting tools out there and I’m sure everyone has there own preference on which they’re going to use.

We have selected monit for simple monitoring of disk space, tunnels and processes because its simple to setup and does exactly what we’re asking it to do.

I particularly like DSL for defining which checks you want to perform.

As we’re running monit on multiple machines, we’re also evaluating m/monit which centralises the monitoring of all the separate instances in a nice dashboard.

Installing Monit

Our servers are Red Hat so we’re not using yum install monit which will get you stated on on a Fedora machine. Equally the downloads page on the monit site will give you the quick and easy installation for other common platforms.

# create the install folder
sudo mkdir /opt/monit
cd /opt/monit

# get the latest release
wget http://mmonit.com/monit/dist/binary/5.21.0/monit-5.21.0-linux-x64.tar.gz

# unpack
tar -xvf monit-5.21.0-linux-x64.tar.gz
rm monit-5.21.0-linux-x64.tar.gz
mv monit-5.21.0 5.21.0
cd 5.21.0

# put some links in
sudo ln -s /opt/monit/5.21.0/conf/monitrc /etc/monitrc
sudo ln -s /opt/monit/5.21.0/bin/monit /usr/bin/monit

Configuring Monit

Now the links are in we can configure the monit config file monitrc. The actually file has huge amounts of documentation; I’m going to limit this to the key points to get up and running

sudo vi /etc/monitrc

######################
## Monit control file
######################

set daemon  30   # check services at 30 seconds intervals

set logfile syslog

# configure the mmonit to report to
# set mmonit https://monit:monit@10.10.1.10:8443/collector

# configure email alerts (this is using AWS SES)
SET ALERT alerts@mycompany.com
SET MAILSERVER email-smtp.eu-west-1.amazonaws.com port 587
        username "" password ""
        using TLSV1
        with timeout 30 seconds

set mail-format {
from: my_ses_registered_email@mycompany.com
reply-to: my_ses_registered_email@mycompany.com
subject: $EVENT
message:
Monit
=====
Date:    $DATE
Host:    $HOST
Action:  $ACTION
Service: $SERVICE
Event:   $EVENT

Description:
============
$DESCRIPTION.
}

# configure the host connection
set httpd port 2812 and
  use address 0.0.0.0
  allow 0.0.0.0
  allow admin:monit # change the password


# configure checks - for example processes
CHECK PROCESS tunnel_somewhere MATCHING '.*?(autossh.*?(8081))'

# or disk space
CHECK DEVICE root WITH PATH /
  IF SPACE usage > 80% THEN ALERT

# or network connections
check network eth0 with interface eth0
  IF upload > 1 MB/s THEN ALERT
  IF total downloaded > 1 GB in last 2 hours THEN ALERT
  IF total downloaded > 10 GB in last day THEN ALERT

Starting Monit

To start monit user

sudo monit start

If you make changes to /etc/monitrc then you can reload it with sudo monit reload