Combining rows into an array in pyspark

Overview

I’ve just spent a bit of time trying to work out how to group a Spark Dataframe by a given column then aggregate up the rows into a single ArrayType column.

Given the input;

transaction_iditem
1a
1b
1c
1d
2a
2d
3c
4b
4c
4d

I want to turn that into the following;

transaction_iditems
1[a, b, c, d]
2[a, d]
3[c]
4[b, c, d]

To achieve this, I can use the following query;

from pyspark.sql.functions import collect_list

df = spark.sql('select transaction_id, item from transaction_data')

grouped_transactions = df.groupBy('transaction_id').agg(collect_list('item').alias('items'))

Testing private methods with ScalaTest

Overview

As part of my journey into using Scala I have had to get used to the ScalaTest and the wealth of functionality it offers.

One of the enduring headaches with unit testing is find a clean way to test private methods without being left feeling that you’ve somehow compromised the solution in order to fully test.

Example

I’ve used an example which is reasonably common so easy to see the usefulness of the PrivateMethodTester trait.

The example is that of a file loader where the source might be local, or S3 or similar. In this case, I’m going to have a public method on my ObjectWithPrivate scala object, this method will accept a String for the sourcePath to a file that I want to load the content of as a BufferedSource.

The sourcePath may be local, or it may be S3, but as the consumer I don’t really want to care. The logical thing in this situation is to have the implementation details of loading the file hidden in private methods. These methods will attempt to load the file from their respective sources and throw a FileNotFoundException if it isn’t available.

import org.slf4j.{Logger, LoggerFactory}
import scala.io.{BufferedSource, Source}
import scala.reflect.io.File

object ObjectWithPrivate {

  val logger: Logger = LoggerFactory.getLogger("ObjectWithPrivate")

  def loadFromPath(sourcePath: String): BufferedSource = {
    sourcePath match {
      case s if s.startsWith("s3") => loadFromS3(sourcePath)
      case _                       => loadFromLocal(sourcePath)
    }
  }

  private def loadFromS3(sourcePath: String, s3Client: AmazonS3 
                                            = AmazonS3ClientBuilder.defaultClient()): BufferedSource = {
    val uri: AmazonS3URI = new AmazonS3URI(sourcePath)
    try {
      val s3Object: S3Object = s3Client.getObject(uri.getBucket, uri.getKey)
      Source.fromInputStream(s3Object.getObjectContent)
    } catch {
      case aex: AmazonServiceException => {
        if (aex.getStatusCode == 404) {
          throw new FileNotFoundException(s"file not found: $sourcePath")
        }
        throw aex
      }
    }
  }

  private def loadFromLocal(sourcePath: String) = {
    logger.info(s"Loading config from local File: $sourcePath")
    if (!File(sourcePath).exists) {
      throw new FileNotFoundException(s"Config file not found: $sourcePath")
    }
    val bufferedSource = Source.fromFile(sourcePath)
    bufferedSource
  }

}

The difficulty now comes in testing the private methods. Testing local load can be done by calling the public loadFromPath method, but that won’t work with the loadFromS3 method as this needs the S3 Mocking to adaquetely test without requiring connectivity to S3 and a known file guaranteed to be present.

This is where the PrivateMethodTester trait comes in. By mixing this trait into our ScalaTest class, we can invoke a private method on our object. I’ve included the whole test class because it has all the set up of the S3 Mock (I see little point in creating an example that calls S3 then not include the required information on how to replicate.)

import com.amazonaws.auth.{AWSStaticCredentialsProvider, AnonymousAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import io.findify.s3mock.S3Mock
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, PrivateMethodTester}

import scala.io.BufferedSource

class ObjectWithPrivateTest extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll with PrivateMethodTester {

  val endpoint: AwsClientBuilder.EndpointConfiguration = new AwsClientBuilder.EndpointConfiguration(
      "http://localhost:8001",
      "eu-west-1"
    )
  val credentials = new AWSStaticCredentialsProvider(new AnonymousAWSCredentials)
  val api: S3Mock = new S3Mock.Builder()
                        .withPort(8001)
                        .withInMemoryBackend.build
  api.start

  override def beforeEach() {
    val client = AmazonS3ClientBuilder.standard
      .withPathStyleAccessEnabled(true)
      .withEndpointConfiguration(endpoint)
      .withCredentials(credentials)
      .build
    client.createBucket("testbucket")
    client.putObject("testbucket", "files/file1", "file1_content")
  }

  override def afterAll() {
    api.stop
  }

  test("ObjectWithPrivate loads a test file from S3") {
    val client = AmazonS3ClientBuilder.standard
      .withPathStyleAccessEnabled(true)
      .withEndpointConfiguration(endpoint)
      .withCredentials(credentials)
      .build

    val loadFromS3 = PrivateMethod[BufferedSource]('loadFromS3)
    val content = ObjectWithPrivate invokePrivate loadFromS3(
      "s3://testbucket/files/file1",
      client
    )
    content.mkString shouldBe "file1_content"
  }
}

// further tests for local omitted

In the test, the key part is the following line;

val loadFromS3 = PrivateMethod[BufferedSource]('loadFromS3)

This creates a PrivateMethod object which will return a BufferedSource which we pass the name of the method to be called as a Symbol. One of the features added by the PrivateMethodTester is the invokePrivate method such that we can use it to call the private method on a given Object (or instance of a class for that matter)

val content = ObjectWithPrivate invokePrivate loadFromS3(
  "s3://testbucket/files/file1",
  client
)

This will call the private method, returning our BufferedSource and I can test that the content of the mocked S3 object is infact file1_content.

For interest, here is the build.sbt for this simple project

name := "PrivateMethodTester"

version := "0.1"

scalaVersion := "2.12.8"

// dependencies versions
val amazonSdkVersion = "1.11.540"
val logbackClassicVersion = "1.2.3"
val s3MockVersion = "0.2.4"
val scalaTestVersion = "3.0.5"
val slf4jVersion = "1.7.25"

libraryDependencies ++= Seq(
  "com.amazonaws" % "aws-java-sdk-core" % amazonSdkVersion,
  "com.amazonaws" % "aws-java-sdk-s3" % amazonSdkVersion,
  "org.slf4j" % "slf4j-api" % slf4jVersion,
  "ch.qos.logback" % "logback-classic" % logbackClassicVersion,
  "org.scalatest" %% "scalatest" % scalaTestVersion,
  "io.findify" %% "s3mock" % s3MockVersion % Test
)

Databricks Single SignOn with Azure Active Directory

Overview

At my current workplace we are using Databricks with much success. Having recently activated the Security Operations Package I was keen to implement the Single SignOn (SSO) functionality.

The documentation provided by Databricks doesn’t seem to cover integrating with Azure Active Directory as a SAML 2.0 Identity Provider and it took some effort to work out how to do it.

Simple Steps

  1. Log into Azure Portal and from the menu on the left, select Azure Active Directory then Enterprise applications from the secondary menu. Azure Active Directory - Enterprise Apps

  2. Select New Application to create a new Enterprise application Azure Active Directory - New App

  3. Databricks isn’t one of the Gallery Applications at the time of writing, so select Non-Gallery Application from the available list. Azure Active Directory - Non Gallery Application

  4. This is where the Databricks instructions is unclear, you need to use your Databricks URL as the Identity Provider Entity ID. Azure Active Directory - Basic SAML Settings

  5. When you’ve completed and saved the basic settings, you’ll be able to download the x.509 certificate and have access to the Login URL to use in the Databricks Admin Console. Download the cert and open with a text editor to extract the certificate content Azure Active Directory - Cert and Login

  6. You can now take these details over to the Databricks admin console to configure SSO. Enter the details into the Single Sign On tab in the Admin Console page. Your Identity Provider Entity ID is the root of your Databricks cloud URL.

Databricks Admin Console - SSO

  1. You can now log out, then log in using Single SignOn through Azure which should get you straight back in. Databricks Admin Console - SSO Login

A Note on Allow User Creation

If you enable Allow auto user creation, when a user logs in, it will create the user for them automatically. This is fine if you’ve configured Azure Active Directory to specify users who have a Role to use the Enterprise Application. For our use case, I’ve gone with this option disabled and enabled open access at the Active Directory end. This means that unknown (from a Databricks perspective) but otherwise authenticated users don’t have access to the environment


CIDR Cheatsheet for available IP Address

This is less of a post and more of a CIDR to mask cheatsheet to remind me how many IPs I get for a given block.

CIDRSubnetTotal IPsUsable IPs
/32255.255.255.25511
/31255.255.255.25420
/30255.255.255.25242
/29255.255.255.24886
/28255.255.255.2401614
/27255.255.255.2243230
/26255.255.255.1926462
/25255.255.255.128128126
/24255.255.255.0256254
/23255.255.254.0512510
/22255.255.252.01,0241,022
/21255.255.248.02,0482,046
/20255.255.240.04,0964,094
/19255.255.224.08,1928,190
/18255.255.192.016,38416,382
/17255.255.128.032,76832,766
/16255.255.0.065,53665,534
/15255.254.0.0131,072131,070
/14255.252.0.0262,144262,142
/13255.248.0.0524,288524,286
/12255.240.0.01,048,5761,048,574
/11255.224.0.02,097,1522,097,150
/10255.192.0.04,194,3044,194,302
/9255.128.0.08,388,6088,388,606
/8255.0.0.016,777,21616,777,214
/7254.0.0.033,554,43233,554,430
/6252.0.0.067,108,86467,108,862
/5248.0.0.0134,217,728134,217,726
/4240.0.0.0268,435,456268,435,454
/3224.0.0.0536,870,912536,870,910
/2192.0.0.01,073,741,8241,073,741,822
/1128.0.0.02,147,483,6482,147,483,646
/00.0.0.04,294,967,2964,294,967,294

git alias for concise history

This is pretty short post. H/T to @prokopp for telling me know about this.

Git allows you to add aliases in your global config - this is the first one I’ve actually added and all it does is a concise, clearly formatted git log.

To add it to you git global config, just run the command below in your terminal

git config --global alias.hist "log --pretty=format:'%h %ad | %s%d [%an]' --graph --date=short"

An example of the output from this site’s repository in my github account is below.

* 8295bc3 2019-03-13 | update the truncator (HEAD -> master, origin/master, origin/HEAD) [owenrumney]
* a014dcc 2019-03-13 | update the diagram for the aws glossary site [owenrumney]
* 608f994 2019-03-01 | Update the aws-glossary diagram [owenrumney]
* a5c519c 2019-02-27 | update disqus [owenrumney]
* f1cb250 2019-02-27 | update image [owenrumney]
* 3b24958 2019-02-27 | add the new post [owenrumney]
* b98a6c2 2019-02-21 | add the announcement page [owenrumney]
* 50b4613 2019-02-21 | Moving AWS Link [owenrumney]
* 9d3530d 2019-02-21 | update with filtering [owenrumney]
* 13d7220 2019-02-21 | update with filtering [owenrumney]
* 7c75ac4 2019-02-21 | Add the category filtering to the aws services page [owenrumney]
* 8a48b45 2019-02-21 | update the aws services page [owenrumney]
* 4fd4bcb 2019-02-20 | update aws services [owenrumney]
* 2dfa138 2019-02-20 | add the aws service page [owenrumney]
* a479a28 2019-02-20 | add AWS Services [owenrumney]
* a123376 2019-02-16 | Add lightsout post [owenrumney]
* b9f95a2 2019-02-16 | update the CNAME [owenrumney]
* edd7bac 2019-02-16 | update the CNAME [owenrumney]
* bbe3221 2019-02-16 | update the excerpt on the home page [owenrumney]
* 9fc9402 2019-01-28 | Update post name [owenrumney]
* a2e68a2 2019-01-28 | Add the images [owenrumney]
... cntd

Now I need to dream up some other aliases….