Argument defaults in shell scripts

Regularly when writing a shell script I find that I want to be able to pass an argument into the script but only sometimes. For example if I want the script to output to /tmp folder for the most part but I’d like the opportunity to select the output myself.

Default arguments can be used in scripts using the following simple syntax

#!/bin/sh

# example script to write to output folder

OUTPUT_PATH=${1:-/tmp/output}

echo "some arbitrary process" > ${OUTPUT_PATH}/arbitrary_output.output

This will either used the first parameter passed in for the output path or a default value of /tmp/output if that isn’t provided

sh example_script.sh # outputs to /tmp/output

sh example_script.sh /var/tmp/special # outputs to /var/tmp/special

Running Spark against HBase

Its reasonably easy to run a Spark job against HBase using the newAPIHadoopRDD available on the SparkContext.

The general steps are,

  1. create an HBaseConfiguration
  2. create a SparkContext
  3. create a newAPIHadoopRDD
  4. perform job action

To get this working, you’re going to need the HBase libraries in your build.sbt file. I’m using HBase 1.1.2 at the moment so thats the version I’m pulling in.

"org.apache.hbase" % "hbase-shaded-client" % "1.1.2"
"org.apache.hbase" % "hbase-server" % "1.1.2"

Creating the HBaseConfiguration

This requires, at a minimum, the zookeeper URI. In my environment the test and the production have different ZOOKEEPER_ZNODE_PARENT so I’m passing that in to override the default.

def createConfig(zookeeper: String, hbaseParentNode: String, tableName: String): Configuration = {
  val config = HBaseConfiguration.create()
  config.set("zookeeper.znode.parent", hbaseParentNode)
  config.set("hbase.zookeeper.quorum", zookeeper)
  config.set("hbase.mapreduce.inputtable", tableName)
  config
}

Creating the SparkContext

The SparkContext is going to be the main engine of the job. At a minimum we just need to have the SparkConf with the job name.

val conf = new SparkConf().setAppName(jobname)
val spark = new SparkContext(conf)

Creating the newAPIHadoopRDD

We have a HBaseConfiguration and a SparkContext so now we can create the newAPIHadoopRDD. The newAPIHadoopRDD needs the config with the table name and namespace and needs to know to use a TableInputFormat for the InputFormat. We’re expecting the class of the keys to be ImmutableBytesWritable and for the values a Result.

val zookeeper = "hbase-box1:2181,hbase-box2:2181"
val hbaseParentNode = "/hbase"
val tableName = "credit_data:accounts"

val config = createConfig(zookeeper, hbaseParentNode, tableName)


val hBaseRDD = spark.newAPIHadoopRDD(config,
                classOf[TableInputFormat],
                classOf[ImmutableBytesWritable],
                classOf[Result])

Performing the Job Action

Thats all we need, we can now run our job. Its contrived, but consider the following table.

keyd:cld:cb
12346788384729381000.00432.00
9842897418374027100.0095.70
7880927412346013600.00523.30

In our table, we have a key with the credit card number and a ColumnFamily of d: which holds the column_qualifiers cl (credit limit) and cb (current balance).

For this job, I want to know all the accounts which are at >90% of their available credit.

case class Account(ccNumber: String, limit: Double, balance: Double)

val accountsRDD = hBaseRDD.map(r => {
    val key = Bytes.toStringBinary(t._1.get())
    val result = t._2.getFamilyMap("d")
    val limit = Bytes.toDouble(result.get("cl"))
    val balance = Bytes.toDouble(result.get("cb"))
    Account(key, limit, balance)
})

That gives us a nicely typed RDD of Accounts we can use to do our filtering on.

val eligibleAccountsRDD = accountRDD.filter(a => {
    (a.balance / a.limit) > 0.9
})

That gives the matching accounts which we can now extract the account number for and save to disk.

val accountNoRDD = eligibleAccountsRDD.map(a => {
    a.ccNumber
}).saveAsTextFile("/save/location")

The save location will now be a folder with the created part-xxxxx files containing the results. In our case…

9842897418374027
7880927412346013

Replacing an incorrect git commit message

If you have committed some code to git (or in the current case, BitBucket) and you have made an error in the commit message (in the current case, referenced the wrong Jira ticket), all is not lost.

To replace the commit message perform the following actions.

git commit -amend

Change the commit message, in my case;

FOO-1234 - fix the bar
 - add some stuff

to

FOO-1235 - fix the bar
 - add some stuff

Then all that is required is to do a push with --force

git push --force

TFL Cycling DataSet - Part 2

Following on from part 1 of this mini series. I’ve got my local environment ready to go and I have pulled down some test data to work with.

The next step is to start having a look at some of the data.

Loading in the data

We know that the data is in csv format so can use the Spark read functionality to bring it in. With the single file in this local environment it’s a case of;

data = spark.read.csv('01aJourneyDataExtract10Jan16-23Jan16.csv', header=True, inferSchema=True)
for col in data.columns:
    data = data.withColumnRenamed( col, col.replace(" ", ""))

This line will create a DataFrame called data and load the csv input into it. By setting header to True we are saying that the first row of the data is a header row.

inferSchema will ask Spark to have a go at working out the correct types for the columns that are being brought in.

Quick Cleanup

Even though inferSchema was used, if we call data.describe() we can see that the type of the dates is string. I’m going to put that down to the fact that these dates are in UK format.

DataFrame[summary: string
, RentalId: string
, Duration: string
, BikeId: string
, EndDate: string
, EndStationId: string
, EndStationName: string
, StartDate: string
, StartStationId: string
, StartStationName: string]

I think I’m going to want these to be dates later on, so I’m going to convert them to timestamps now.

from pyspark.sql.functions import unix_timestamp
dated_data = data.select('RentalId' \
           ,unix_timestamp('StartDate', 'dd/MM/yyyy HH:mm').cast("double").cast("timestamp").alias('StartDate') \
           ,unix_timestamp('EndDate', 'dd/MM/yyyy HH:mm').cast("double").cast("timestamp").alias('EndDate') \
           ,'Duration' \
           ,'StartStationId' \
           ,'EndStationId')

This block uses the unix_timestamp function to get the long number representation of the date which we can then turn into the timstamp type. By passing the format of the date we can solve the issue of it being in a format that the inferSchema wasn’t expecting. I’ve used .alias() to specify the name of the derived column.

Getting the StationId Data

There is an API which we can use to get the additional data for the StartStation_Id and EndStation_Id. This can be found here on the TfL website.

We need a list of all the start and end bike point/stations that are in the dataset so I went for doing a union to get this.

stationids = sorted((data.select('StartStationId') \
                    .union(data.select('EndStationId'))) \
                    .rdd.map(lambda r: r[0]) \
                    .distinct() \
                    .collect())

This will return us a sorted list of all the Ids in the dataset which can be passed into a helper method which will call into the API mentioned about.

def get_bike_points(points_ids):
    bike_point_file = '~/datasets/cycling/bike_points.csv'
    base_url = 'https://api.tfl.gov.uk/BikePoint/BikePoints_'

    with open(bike_point_file, 'w') as csvfile:
        fieldnames = ['pointId', 'commonName', 'lat', 'lon', 'placeType', 'properties']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for point in points_ids:
            if point == None:
                continue
            url = "%s%s" % (base_url, point)
            req = requests.get(url)
            if req.status_code != 200:
                writer.writerow({"pointId": point, "commonName": "Not Found"})
            else:
                bike_point = json.loads(req.text)
                props = {}
                if bike_point.has_key('additionalProperties'):
                    for p in bike_point['additionalProperties']:
                        props[p['key']] = p['value']
                writer.writerow({"pointId": point, "commonName": bike_point['commonName'], "lat": bike_point['lat'], \
                                "lon": bike_point['lon'], "placeType": bike_point['placeType'], 'properties': props})
        csvfile.flush

This block takes the list of Id and collects the data for the bike station, extracts what is wanted from the returned dataset then saves it into a csv file.

Cleaning up the StationId data

Some of the stations in the dataset aren’t there anymore so we get a 404 when we hit the page. To get round this I’ve just created a line for the ID with a common name of not found.

That said, we do have this information in the original data set, so a bit of fiddling can be used to update the bike_points data with the correct commonName.

bike_points = spark.read.csv('bike_points.csv', header=True, inferSchema=True)

combined_bike_points = bike_points.where(bike_points.commonName == "Not Found") \
                      .join(data, data.StartStationId == bike_points.pointId)\
                      .select(bike_points.pointId \
                            , data.StartStationName.alias("commonName") \
                            , bike_points.lat \
                            , bike_points.lon \
                            , bike_points.placeType \
                            , bike_points.properties) \
                      .distinct()

bike_points = combined_bike_points \
              .union(bike_points \
                     .where(bike_points.commonName <> "Not Found"))

Okay, long winded but we now have the station data to work with too.


TFL Cycling DataSet - Part 1

I’m hoping this will be a reasonably accurate account of my play with the TfL Cycling DataSets.

I’m still forming my plan, however loosely I think I want to end up with a visualisation where the bike points are highlighted in over a time series as bikes are taken and returned.

Initially, I’m working on my Mac, but I have a Databricks community cluster that I’ve migrated some of the parts to.

Preparing my Local Env

As I said, I’m using my MacBook so I’m going to install a couple of things

Install Spark

To install spark, I use brew

brew install spark

Install Jupyter

Installing jupyter notebooks is done with pip

pip install jupyter

Getting some data

I took a single file from the S3 bucket to play with locally, for no particular reason I went with 01aJourneyDataExtract10Jan16-23Jan16.csv

aws s3 cp s3://cycling.data.tfl.gov.uk/usage-stats/01aJourneyDataExtract10Jan16-23Jan16.csv ~/datasets/cycling/.

Starting Up

Run the following commands to get your Jupyter Notebook up and running

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
pyspark

Quick Test

Finally a quick test to see how it looks. In the Jupyter notebook I can do

data = spark.read.csv('~/datasets/cycling/01aJourneyDataExtract10Jan16-23Jan16.csv', header=True, inferSchema=True)
data.show()

This should show you 20 rows from the data set and we’re off.