Writing a Flume Interceptor

He we are in June, some five months since the last post and I finally have some time and content to sit and write a post.

In April 2013 I started working with Hadoop, the plan was to suck in server application logs to determine who was using what data within the business to make sure it was being correctly accounted for. At the time, Flume seemed like the obvious choice to ingest these files till we realised the timing, format and frequency made Flume a little like over kill. As it happened, it was discounted before I could get my teeth into it.

Two years later and there is a reason to use Flume - high volumes of regularly generated XML files which need ingesting into HDFS for processing - clearly a use case for Flume.

There are two key requirements for this piece, one that the file name be preserved somehow and that the content be converted to JSON inflight - for this post I’m going to focus only on the former.

When setting up the configuration for the Flume agent, the Spooling Directory Source can be configured to with fileHeader = true which will add the full path of the originating file into the header where it can be used by the interceptor. This can be appended to the destination path in HDFS, but as it contains the complete originating path it will go into a similar structure to source - in our case that isn’t desirable.

To solve this, I’m writing and interceptor which will mutate the path to just have the filename with no extension.

Creating the inteceptor requires a number of steps;

  1. Importing required dependencies;
<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.5.0</version>
</dependency>

Then we need to create the abstract class which implements Interceptor which will be used as a base for future interceptors.

public class AbstractFlumeInterceptor implements Interceptor {

    public void initialize() {    }

    public Event intercept(Event event) {
        return null;
    }

    public List<Event> intercept(List<Event> events) {
        for (Iterator<Event> eventIterator = events.iterator(); eventIterator.hasNext(); ) {
            Event next =  intercept(eventIterator.next());
            if(next == null) {
                eventIterator.remove();
            }
        }
        return events;
    }

    public void close() {    }
}

Now we have this class which wraps up the logic of handling a list of Events we need to create the concrete class called FilenameInterceptor

@Override
public Event intercept(Event event) {
    Map<String, String> headers = event.getHeaders();
    String headerValue = headers.get(header); // header in this case is 'file' as per the config
    if(headerValue == null) {
        headerValue = "";
    }
    Path path = Paths.get(headerValue);
    if (path != null && path.getFileName() != null) {
        headerValue = FilenameUtils.removeExtension(path.getFileName().toString());
    }
    headers.put(header, headerValue);
    return event;
}

In the conf file for Flume we need the nested class in our Interceptor to build it, so the following Builder class is added

public static class Builder implements Interceptor.Builder {
    private String headerkey = "HostTime";

    public Interceptor build() {
        return new FilenameInterceptor(headerkey);
    }

    public void configure(Context context) {
        headerkey = context.getString("key");
    }
}

Now we have all this we can mvn clean package and copy the jar to the lib folder - in my case we’re using Cloudera so its in the parcels folder /opt/cloudera/parcels/CDHxxx/flume-ng/lib, from here it will be picked up with flume-ng starts.

The new additions to the conf file are;

# ... source1 props ...
agent1.sources.source1.fileHeader = true
agent1.sources.source1.interceptors = interceptor1
agent1.sources.source1.interceptors.interceptor1.type = [package].[for].[Interceptor].FilenameInterceptor$Builder
# ... hdfs1 props ...
agent1.sinks.hdfs1.filePrefix = %{file}

Quick introduction to pyspark

All the work I have been doing with AWS has been using Python, specifically boto3 the rework of boto.

One of the intentions is to limit bandwidth when transferring data to S3 the idea is to send periodic snapshots then daily deltas to merge and form a latest folder so a diff mechanism is needed - I originally implemented this in Scala as a Spark process but in an effort to settle on one language I’m looking to redo in Python using pyspark

I’m using my Macbook and to keep things quick and easy I’m going to download a package with Hadoop and Spark then dump it in /usr/share

wget http://archive.apache.org/dist/spark/spark-1.0.2/spark-1.0.2-bin-hadoop2.tgz
tar -xvf spark-1.0.2-bin-hadoop2.tgz
mv spark-1.0.2-bin-hadoop2 /usr/share/spark-hadoop

I’m going to create a folder to do my dev in under my home folder, to keep things clean I like to use virtualenv

cd ~/dev
virtualenv pyspark
cd pyspark

To start pyspark with IPYTHON we need to start it with some IPYTHON_OPTS

IPYTHON_OPTS="notebook" /usr/share/spark-hadoop/bin/pyspark

This opens IPython notebook in the default browser.

Finally, a quick and dirty demo with word count

file = sc.textFile("/data/bigtextfile.txt")
counts = file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("/data/bigtextfile.txt")

Client side encryption using Boto3 and AWS KMS

Towards the end of 2014 Amazon released the KMS service to provide a cheaper cut down offering for Key Management Services than those provided with the CloudHSM solutions (although it still uses hardware HSM underneath).

KMS service can be accessed through IAM service at the bottom option on the left side menu is Encryption Keys. May sure you change the region filter to the correct region before creating or trying to view your customer keys.

To create the customer key click the Create Key button and follow through the instructions to create a new master key - take a note of the Key ID then you’re ready to go.

You need a couple of libraries before you start, for testing I use virtualenv

bin/pip install boto3

bin/pip install pycrypto

##Encrypting

I’m using PyCrypto library for no other reason than it appeared in the most results when I was looking for a library.

I won’t go into much detail on the code because I don’t know much about encryption so I cobbled this together from the information in the pycrypto page.

The key that is going to be supplied is the data key generated from the AWS key management service.

from Crypto import Random
from Crypto.Cipher import AES

def pad(s):
    return s + b"/0" *(AES.block_size - len(s) % AES.block_size)

def encrypt(message, key, key_size=256):
    message = pad(message)
    iv = Random.new().read(AES.block_size)
    cipher = AES.new(key, AES.MODE_CBC, iv)
    return iv + cipher.encrypt(message)

def decrypt(ciphertext, key):
    iv = ciphertext[:AES.block_size]
    cipher = AES.new(key, AES.MODE_CBC, iv)
    plaintext = cipher.decrypt(ciphertext[AES.block_size:])
    return plaintext.rstrip(b"\0")

def encrypt_file(file_name, key):
    with open(file_name, 'rb') as fo:
        plaintext = fo.read()
    enc = encrypt(plaintext, key)
    with open(file_name + ".enc", 'wb') as fo:
        fo.write(enc)

def decrypt_file(file_name, key):
    with open(file_name, 'rb') as fo:
        ciphertext = fo.read()
    dec = decrypt(ciphertext, key)
    with open(file_name[:-4], 'wb') as fo:
        fo.write(dec)

##Creating the data key to encrypt

For each item I want to encrypt I am going to create a new data key - this is a key that is generated in the KMS and the master key for the customer is used to encrypt it.

The call to the api returns the plaintext key and the cipher version for storage with the encrypted file (in the case of S3 you could upload the base64 encoded version to a metadata flag)

In this code, customer_key is the KeyId from the AWS console for the key you created at the start - its a guid.

import boto3

kms = boto3.client('kms')
data_key_req = kms.generate_data_key(KeyId=customer_key, KeySpec='AES_256')
data_key = data_key_req['Plaintext']
data_key_ciphered = data_key_req['CiphertextBlob']

encrypt_file(filepath, data_key)

This will create a new encrypted file for file test.txt it would create a new file test.txt.enc

if you were going to upload to s3, you might use something like;

import base64

s3 = boto3.client('s3')
s3.put_object(Bucket='mybucketname', Body=open('test.txt.enc', 'r'),
Key='test.txt', Metadata={'encryption-key': base64.b64encode(data_key_ciphered)})


Adventures with Spark, part two

Some time ago, back in September, I wrote a post on starting my adventures with Spark but didn’t progress things very far.

On thing that was holding me back was a reasonably real world problem to use as a learning case. I recently came across a question which seemed like a good starting point and for the last few evenings I have been working on a solution.

The problem

A credit card company is receiving transaction data from around the world and needs to be able to spot fraudulent usage from the transactions.

To simplify this use case, I’m going to pick one fabricated indicator of fraudulent usage and focus on that.

  • An alert must be raised if a credit card makes £10,000 of purchases within a 10 minute sliding window

For the purposes of this learning project I am going to assume the following this;

  • There is a high volume of transactions
  • No data needs to be retained
  • Once an alert has been raised, a black box system will react to it

The solution

From the outset, this problem seems perfectly suited to Spark Streaming and with the high volume its going to need a queue to manage the incoming transaction data.

I’m going to create a basic producer to pump transactions into Kafka to simulate the inbound transactions.

I don’t want to detail the process of install Kafka and getting Spark set up, I’m using a Macbook and used brew to get everything installed and I’m using SBT for the solution which can be found on github.

Step 1: - Start the zookeeper for Kafka


# in my case $KAFKA_HOME = /usr/local/Cellar/kafka_2.10-0.8.1.1/
cd $KAFKA_HOME
./bin/zookeeper-server-start.sh config/zookeeper.properties

Step 2: - Start the Kafka server


cd $KAFKA_HOME
./bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

Step 3: Create the Kafka topic


cd $KAFKA_HOME
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_queue

Step 4: Create a Scala project - I am going to use IntelliJ IDEA because it’s what I know.

Step 5: Add dependencies to the build.sbt file


name := "sparkStreaming_kafka"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.1.1"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.1.1"

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1"

libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"

Step 6: Creating the transaction generator


class TransactionGenerator(noOfCards: Int) {
  import java.util.{Calendar, Properties}
  import kafka.javaapi.producer.Producer
  import kafka.producer.{KeyedMessage, ProducerConfig}
  import scala.util.Random

  private def generateCardNumber: String = {
    val sb = new StringBuilder(16)
    for (i <- 0 until 16) {
      sb.append(Random.nextInt(10).toString)
    }
    return sb.toString
  }

  val cards = for (i <- 0 until noOfCards) yield generateCardNumber

  def start(rate: Int): Unit = {
    val props = new Properties()
    props.put("metadata.broker.list", "localhost:9092");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    val config = new ProducerConfig(props)

    val producer = new Producer[String, String](config)

    while (true) {
      Thread.sleep(rate)
      val now = Calendar.getInstance.getTime.toString
      val card = cards(Random.nextInt(cards.length))
      val amount = Random.nextDouble() * 1000
      val message = new KeyedMessage[String, String]("kafka_queue", f"$now%s\t$card%s\t$amount%1.2f")
      producer.send(message)
    }
  }
}

Step 7: Driving the generator


object program {
  def main(args: Array[String]): Unit = {
  	// how many transactions to create a second and for how many cards
    val transPerSec = 5
    val cards = 200
    val tranGen = new TransactionGenerator(cards)
    // start the generator
    tranGen.start(1000/transPerSec)
  }
}

Step 8: The fraud alerting service


package com.owenrumney.sparkstreaming

import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

case class Transaction(date: String, cardNo: String, amount: Double)
case class Alert(cardNo: String, message: String)

class FraudAlertingService extends Serializable {

  def alert(alert: Alert): Unit = {
    println("%s: %s".format(alert.cardNo, alert.message))
  }
  def start() {
    val stream = new StreamingContext("local[2]", "TestObject", Seconds(10))
    val kafkaMessages: ReceiverInputDStream[(String, String)] =
      KafkaUtils.createStream(stream, "localhost:2181", "1", Map("kafka_queue" -> 1))

    kafkaMessages.window(Minutes(10), Seconds(10)).foreachRDD(rdd => rdd.map(record => {
      val components = record._2.split("\t")
      Transaction(components(0), components(1), components(2).toDouble)
    }).groupBy(transaction => transaction.cardNo)
      .map(groupedTransaction =>
      (groupedTransaction._1, groupedTransaction._2.map(transaction => transaction.amount).sum))
      .filter(m => m._2 > 10000)
      .foreach(t => alert(Alert(t._1, "Transaction amount exceed"))))

    stream.start()
    stream.awaitTermination()
  }
}

Step 9:


import org.apache.log4j.Logger

object spark_program {
  def main(args: Array[String]): Unit = {
    Logger.getRootLogger.setLevel(org.apache.log4j.Level.ERROR)
    val faService = new FraudAlertingService
    faService.start()
  }

So thats it, we’ll get a printed alert when the service picks up a card with over £10k in 10 minutes.

I know that the code isn’t great - I’m still working out Scala, so I will be improving on it where I can. My next post on the subject will be moving to a cloud implementation running over multiple node cluster to see what I can learn from that.


AWS HTTPSConnectionPool max retries exceeded

I’m working with a new AWS account and I am moving to testing Boto3 to use the KMS service. I needed to make sure that the AWS account and secret keys were updated so ran aws configure to quickly update them.

I added the new keys and saw that default region was set to [Ireland] so accepted default and ran the following code

import boto3

s3 = boto3.resource('s3')
for bucket in s3.buckets.all():
print(bucket.name)

I was puzzled to get the following error;

botocore.vendored.requests.exceptions.ConnectionError: HTTPSConnectionPool(host='s3.ireland.amazonaws.com', port=443): Max retries exceeded with url: / (Caused by <class 'socket.gaierror'>: [Errno 8] nodename nor servname provided, or not known)

It didn’t sit right that the url had ireland in it explicitly when its generally the region code that is used with AWS so I went back the aws configure and set eu-west-1 as the default region.

On rerunning the code it all worked, so worth noting if this error comes up.