Azure Databricks – Transforming Data Frames in Spark


pexels-photo-354939In previous weeks, we’ve looked at Azure Databricks, Azure’s managed Spark cluster service.

We then looked at Resilient Distributed Datasets (RDDs) & Spark SQL / Data Frames.

We wanted to look at some more Data Frames, with a bigger data set, more precisely some transformation techniques.  We often say that most of the leg work in Machine learning in data cleansing.  Similarly we can affirm that the clever & insightful aggregation query performed on a large dataset can only be executed after a considerable amount of work has been done into formatting, filtering & massaging data:  data wrangling.

Here, we’ll look at an interesting dataset, the H-1B Visa Petitions 2011-2016 (from Kaggle) and find some good insights with just a few queries, but also some data wrangling.

It is important to note that about everything in this article isn’t specific to Azure Databricks and would work with any distribution of Apache Spark.

The notebook used for this article is persisted on GitHub (see this article on how to import it in your Workspace).

The Data

From Kaggle:

H-1B is an employment-based, non-immigrant visa category for temporary foreign workers in the United States. For a foreign national to apply for H1-B visa, an US employer must offer a job and petition for H-1B visa with the US immigration department. This is the most common visa status applied for and held by international students once they complete college/ higher education (Masters, PhD) and work in a full-time position.

Kaggle offers data versioning so we didn’t feel we needed to copy the dataset into GitHub.  We used Version 2 of the dataset.

The data set is contained in a single CSV file of 0.5GB.  It has few columns but a large number of rows (3 million):

image

The Challenges

We can notice a couple of cosmetic aspects that warrant some light wrangling:

  • The ID column simply doesn’t have a name in the CSV file
  • The WORKSITE column contains a city and a state with a comma separating them ; this is denormalized data and it can be useful to look at state or city separately (and we will)
  • The FULL_TIME_POSITION has values ‘Y’ and ‘N’ ; it should be a boolean

Wrangling

As in past articles, we use Python SDK.  We make sure our cluster is configured with credentials of the storage account where we copied the data.

Then, we simply point to the file in the storage account.


#  Replace with your container and storage account:  "wasbs://<container>@<storage account>.blob.core.windows.net/"
pathPrefix = "wasbs://hb1-visa@vpldb.blob.core.windows.net/"
path = pathPrefix + "h1b_kaggle.csv"
# Load CSV
df = spark.read.option("header","true").csv(path)

Here we use the spark session to load the data as a Data Frame.  The spark context is used to manipulate RDDs while the session is used for Spark SQL.  Each interface offer different load methods with the Spark Context offering more high level methods.

Here we load the CSV file as a CSV, interpreting its header row and inferring the schema given the data present in each column.

CSV load works well but we want to rework some columns.  For that we’ll flip back to an RDD representation.  To pass from a Data Frame df to its RDD representation we can simply use df.rdd.

That RDD will be an RDD of Row (i.e. strong typed).  A Row is a read-only object which makes it cumbersome to manipulate as we need to repeat each existing column.  An easier way to manipulate it is to use a Python dictionary representation.

So we define the following Python method to manipulate a row:


# Manipulate a data-row to change some columns
def reworkRow(row):
from pyspark.sql import Row

# Let's convert the data row to a dictionary
# This is easier to manipulate as the dictionary isn't readonly
dict = row.asDict()

# Change the first column from _co to id
dict['id'] = dict['_c0']
del(dict['_c0'])

# Split the WORKSITE column into city & state
worksite = dict['WORKSITE'].split(',')
city = worksite[0].strip() if len(worksite)>0 else None
state = worksite[1].strip() if len(worksite)>1 else None
dict['CITY'] = city
dict['STATE'] = state
del(dict['WORKSITE'])

# Change FULL_TIME _POSITION column from 'Y' and 'N' to True / False (boolean)
dict['FULL_TIME_POSITION'] = True if dict['FULL_TIME_POSITION']=='Y' else False

return Row(**dict)

We address the three changes we wanted to address:  id column, worksite column & full time position column.

We can then simply do a map on the RDD and recreate a data frame from the mapped RDD:


# Convert back to RDD to manipulate the rows
rdd = df.rdd.map(lambda row: reworkRow(row))
# Create a dataframe with the manipulated rows
hb1 = spark.createDataFrame(rdd)
# Let's cache this bad boy
hb1.cache()
# Create a temporary view from the data frame
hb1.createOrReplaceTempView("hb1")

We cached the data frame.  Since the data set is 0.5GB on disk, it is useful to keep it in memory.  The .cache method does a best effort job of keeping the data in the RAM of the worker nodes.  This means that from one query to the next, the dataframe isn’t fully reconstructed.  It does improve the performance drastically as we can easily test by commenting that command out.

We also register the data frame as a temporary view so we can use it in SQL.

This is the extend of wrangling we do with this data set.  Nothing very complicated, but as it often is the case, we do more than just load the CSV and query on it.

Insights

First, let’s check the size of the data set:


%sql
SELECT COUNT(*)
FROM hb1

This gives us a count of 3 002 458, which is a good size data set.

We can then check which state are most popular for immigration requests in the USA:


%sql
SELECT STATE, COUNT(*) AS petitions
FROM hb1
GROUP BY STATE
ORDER BY petitions DESC

The result is our first insight:

image

California being on top isn’t surprising.  Texas being second is a little more interesting, although since it is followed by New York & New Jersey, it is questionable if those two aren’t both related to New York City’s attraction power, so together they would be 2nd.

If we breakdown by city, a different immigration portrait appears:


%sql
SELECT CITY, STATE, COUNT(*) AS petitions
FROM hb1
GROUP BY CITY, STATE
ORDER BY petitions DESC

image

Indeed, New York takes the lion share and Houston comes 2nd.  Atlanta comes 4th while the state of Georgia was 9th.

This is a great example of drill down to better understand the data.

We could have a look at the distribution of the case status:


%sql
SELECT CASE_STATUS, COUNT(*) AS petitions
FROM hb1
GROUP BY CASE_STATUS
ORDER BY petitions DESC

image

A surprisingly low level of rejection.

Let’s look at the employers sponsoring those visas.


%sql
SELECT EMPLOYER_NAME, COUNT(*) as count
FROM hb1
GROUP BY EMPLOYER_NAME
ORDER BY count DESC
LIMIT 20

We see that the top of employers is dominated by the IT industry.

image

Looking at the job title of applicants corroborates the dominance of the IT industry:


%sql
SELECT JOB_TITLE, COUNT(*) AS count
FROM hb1
GROUP BY JOB_TITLE
ORDER BY count DESC
LIMIT 20

image

We could query further to understand how the IT industry structure this data set.

Those were just a few queries using aggregations.  Since the underlying data frame is cached, they run very fast:  in a few seconds.

Summary

We wanted to show how to first load a CSV data set to then pre process it a little to change some of its characteristics.

Here we “normalized” a field, i.e. we split the state from city in two columns.  We renamed the id-field.  Finally, we re-interpreted the full time position field into a boolean.

In our case we do this with only one file, but we could just as easily have done it on multiple files.

We chose to do those modifications using the RDD interface.  We could have done it using the data frame interface or even in SQL.  We found it more natural to do it in the RDD.

Advertisements

5 thoughts on “Azure Databricks – Transforming Data Frames in Spark

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s