Client side encryption using Boto3 and AWS KMS

Towards the end of 2014 Amazon released the KMS service to provide a cheaper cut down offering for Key Management Services than those provided with the CloudHSM solutions (although it still uses hardware HSM underneath).

KMS service can be accessed through IAM service at the bottom option on the left side menu is Encryption Keys. May sure you change the region filter to the correct region before creating or trying to view your customer keys.

To create the customer key click the Create Key button and follow through the instructions to create a new master key - take a note of the Key ID then you’re ready to go.

You need a couple of libraries before you start, for testing I use virtualenv

bin/pip install boto3

bin/pip install pycrypto

##Encrypting

I’m using PyCrypto library for no other reason than it appeared in the most results when I was looking for a library.

I won’t go into much detail on the code because I don’t know much about encryption so I cobbled this together from the information in the pycrypto page.

The key that is going to be supplied is the data key generated from the AWS key management service.

from Crypto import Random
from Crypto.Cipher import AES

def pad(s):
    return s + b"/0" *(AES.block_size - len(s) % AES.block_size)

def encrypt(message, key, key_size=256):
    message = pad(message)
    iv = Random.new().read(AES.block_size)
    cipher = AES.new(key, AES.MODE_CBC, iv)
    return iv + cipher.encrypt(message)

def decrypt(ciphertext, key):
    iv = ciphertext[:AES.block_size]
    cipher = AES.new(key, AES.MODE_CBC, iv)
    plaintext = cipher.decrypt(ciphertext[AES.block_size:])
    return plaintext.rstrip(b"\0")

def encrypt_file(file_name, key):
    with open(file_name, 'rb') as fo:
        plaintext = fo.read()
    enc = encrypt(plaintext, key)
    with open(file_name + ".enc", 'wb') as fo:
        fo.write(enc)

def decrypt_file(file_name, key):
    with open(file_name, 'rb') as fo:
        ciphertext = fo.read()
    dec = decrypt(ciphertext, key)
    with open(file_name[:-4], 'wb') as fo:
        fo.write(dec)

##Creating the data key to encrypt

For each item I want to encrypt I am going to create a new data key - this is a key that is generated in the KMS and the master key for the customer is used to encrypt it.

The call to the api returns the plaintext key and the cipher version for storage with the encrypted file (in the case of S3 you could upload the base64 encoded version to a metadata flag)

In this code, customer_key is the KeyId from the AWS console for the key you created at the start - its a guid.

import boto3

kms = boto3.client('kms')
data_key_req = kms.generate_data_key(KeyId=customer_key, KeySpec='AES_256')
data_key = data_key_req['Plaintext']
data_key_ciphered = data_key_req['CiphertextBlob']

encrypt_file(filepath, data_key)

This will create a new encrypted file for file test.txt it would create a new file test.txt.enc

if you were going to upload to s3, you might use something like;

import base64

s3 = boto3.client('s3')
s3.put_object(Bucket='mybucketname', Body=open('test.txt.enc', 'r'),
Key='test.txt', Metadata={'encryption-key': base64.b64encode(data_key_ciphered)})


Adventures with Spark, part two

Some time ago, back in September, I wrote a post on starting my adventures with Spark but didn’t progress things very far.

On thing that was holding me back was a reasonably real world problem to use as a learning case. I recently came across a question which seemed like a good starting point and for the last few evenings I have been working on a solution.

The problem

A credit card company is receiving transaction data from around the world and needs to be able to spot fraudulent usage from the transactions.

To simplify this use case, I’m going to pick one fabricated indicator of fraudulent usage and focus on that.

  • An alert must be raised if a credit card makes £10,000 of purchases within a 10 minute sliding window

For the purposes of this learning project I am going to assume the following this;

  • There is a high volume of transactions
  • No data needs to be retained
  • Once an alert has been raised, a black box system will react to it

The solution

From the outset, this problem seems perfectly suited to Spark Streaming and with the high volume its going to need a queue to manage the incoming transaction data.

I’m going to create a basic producer to pump transactions into Kafka to simulate the inbound transactions.

I don’t want to detail the process of install Kafka and getting Spark set up, I’m using a Macbook and used brew to get everything installed and I’m using SBT for the solution which can be found on github.

Step 1: - Start the zookeeper for Kafka


# in my case $KAFKA_HOME = /usr/local/Cellar/kafka_2.10-0.8.1.1/
cd $KAFKA_HOME
./bin/zookeeper-server-start.sh config/zookeeper.properties

Step 2: - Start the Kafka server


cd $KAFKA_HOME
./bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

Step 3: Create the Kafka topic


cd $KAFKA_HOME
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_queue

Step 4: Create a Scala project - I am going to use IntelliJ IDEA because it’s what I know.

Step 5: Add dependencies to the build.sbt file


name := "sparkStreaming_kafka"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.1.1"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.1.1"

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1"

libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"

Step 6: Creating the transaction generator


class TransactionGenerator(noOfCards: Int) {
  import java.util.{Calendar, Properties}
  import kafka.javaapi.producer.Producer
  import kafka.producer.{KeyedMessage, ProducerConfig}
  import scala.util.Random

  private def generateCardNumber: String = {
    val sb = new StringBuilder(16)
    for (i <- 0 until 16) {
      sb.append(Random.nextInt(10).toString)
    }
    return sb.toString
  }

  val cards = for (i <- 0 until noOfCards) yield generateCardNumber

  def start(rate: Int): Unit = {
    val props = new Properties()
    props.put("metadata.broker.list", "localhost:9092");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    val config = new ProducerConfig(props)

    val producer = new Producer[String, String](config)

    while (true) {
      Thread.sleep(rate)
      val now = Calendar.getInstance.getTime.toString
      val card = cards(Random.nextInt(cards.length))
      val amount = Random.nextDouble() * 1000
      val message = new KeyedMessage[String, String]("kafka_queue", f"$now%s\t$card%s\t$amount%1.2f")
      producer.send(message)
    }
  }
}

Step 7: Driving the generator


object program {
  def main(args: Array[String]): Unit = {
  	// how many transactions to create a second and for how many cards
    val transPerSec = 5
    val cards = 200
    val tranGen = new TransactionGenerator(cards)
    // start the generator
    tranGen.start(1000/transPerSec)
  }
}

Step 8: The fraud alerting service


package com.owenrumney.sparkstreaming

import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

case class Transaction(date: String, cardNo: String, amount: Double)
case class Alert(cardNo: String, message: String)

class FraudAlertingService extends Serializable {

  def alert(alert: Alert): Unit = {
    println("%s: %s".format(alert.cardNo, alert.message))
  }
  def start() {
    val stream = new StreamingContext("local[2]", "TestObject", Seconds(10))
    val kafkaMessages: ReceiverInputDStream[(String, String)] =
      KafkaUtils.createStream(stream, "localhost:2181", "1", Map("kafka_queue" -> 1))

    kafkaMessages.window(Minutes(10), Seconds(10)).foreachRDD(rdd => rdd.map(record => {
      val components = record._2.split("\t")
      Transaction(components(0), components(1), components(2).toDouble)
    }).groupBy(transaction => transaction.cardNo)
      .map(groupedTransaction =>
      (groupedTransaction._1, groupedTransaction._2.map(transaction => transaction.amount).sum))
      .filter(m => m._2 > 10000)
      .foreach(t => alert(Alert(t._1, "Transaction amount exceed"))))

    stream.start()
    stream.awaitTermination()
  }
}

Step 9:


import org.apache.log4j.Logger

object spark_program {
  def main(args: Array[String]): Unit = {
    Logger.getRootLogger.setLevel(org.apache.log4j.Level.ERROR)
    val faService = new FraudAlertingService
    faService.start()
  }

So thats it, we’ll get a printed alert when the service picks up a card with over £10k in 10 minutes.

I know that the code isn’t great - I’m still working out Scala, so I will be improving on it where I can. My next post on the subject will be moving to a cloud implementation running over multiple node cluster to see what I can learn from that.


AWS HTTPSConnectionPool max retries exceeded

I’m working with a new AWS account and I am moving to testing Boto3 to use the KMS service. I needed to make sure that the AWS account and secret keys were updated so ran aws configure to quickly update them.

I added the new keys and saw that default region was set to [Ireland] so accepted default and ran the following code

import boto3

s3 = boto3.resource('s3')
for bucket in s3.buckets.all():
print(bucket.name)

I was puzzled to get the following error;

botocore.vendored.requests.exceptions.ConnectionError: HTTPSConnectionPool(host='s3.ireland.amazonaws.com', port=443): Max retries exceeded with url: / (Caused by <class 'socket.gaierror'>: [Errno 8] nodename nor servname provided, or not known)

It didn’t sit right that the url had ireland in it explicitly when its generally the region code that is used with AWS so I went back the aws configure and set eu-west-1 as the default region.

On rerunning the code it all worked, so worth noting if this error comes up.


Git hangs while unpacking objects (Windows)

I’m not sure if this is because we’re behind a proxy, the network has issues or my work laptop isn’t great, but for some reason the git clones very often hang during the unpacking of objects.

remote: Counting objects: 21, done.
remote: Total 21 (delta 0), reused 0 (delta 0)
Unpacking objects: 100% (21/21), done.

There is a way to recover this, if you Ctrl+C to exit the git command then cd into the folder cloned into.

git fsck

notice: HEAD points to the unborn branch (master)
Checking object directories: 100% (256/256), done.
notice: No default references
dangling commit: 0a343894574c872348974a89347c387324324

The bit we’re interested in is the dangling commit, if we merge this commit manually all will be fine

git merge 0a343894574c872348974a89347c387324324

Job done, you should now have the completed clone.


Adventures with Spark, part one

For 18 months I’ve been working with Hadoop. Initially it was Hortonworks HDP on Windows then Hortonworks HDP on CentOS and for production we settled on Cloudera CDH5 on Red Hat. Recently we’ve been introduced to Spark and subsequently Scala which I am now in the process of skilling up on, the plan is to blog as I learn.

For the first entries I imagine it won’t be much more than the basic tutorial you could read elsewhere, however the plan is to get more detailed as I learn more.

I can’t introduce Scala better than Scala School so its worth taking a look at that.

I am going to use JetBrains IntelliJ IDEA for developing fuller applications, however for playing and learning you can download Spark for Hadoop in TAR format from the Spark Download Page and use the Spark shell.

For now I just extracted it to a folder in Downloads;

To start the Spark shell

\$ cd ~/Downloads/spark-1.0.2-bin-hadoop2/bin
./spark-shell

One of the key parts to Spark is the SparkContext which if you’ve done mapreduce seems to be similar to the JobConf. The SparkContext has all the required information about where to run the work and application details for view in the Spark UI web page.

In the spark shell you can use the SparkContext sc

scala> val sentence = "The quick brown fox jumps over the lazy dog"
scala> val words = sc.parallelize(sentence)
scala> words.count() // should return 9  
scala> words.filter(\_.toLowerCase() == "the").count() // should return 2

All this is doing is creating a string, splitting it into words and creating a Spark RDD with it. We can use the Action count() to find out how many words there are and we can use the filter() to create a new RDD with filtered results (in this case, filter to the word ‘the’)