Azure Databricks - Transforming Data Frames in SparkSolution ·
In previous weeks, we’ve looked at Azure Databricks, Azure’s managed Spark cluster service.
We then looked at Resilient Distributed Datasets (RDDs) & Spark SQL / Data Frames.
We wanted to look at some more Data Frames, with a bigger data set, more precisely some transformation techniques. We often say that most of the leg work in Machine learning in data cleansing. Similarly we can affirm that the clever & insightful aggregation query performed on a large dataset can only be executed after a considerable amount of work has been done into formatting, filtering & massaging data: data wrangling.
Here, we’ll look at an interesting dataset, the H-1B Visa Petitions 2011-2016 (from Kaggle) and find some good insights with just a few queries, but also some data wrangling.
It is important to note that about everything in this article isn’t specific to Azure Databricks and would work with any distribution of Apache Spark.
The notebook used for this article is persisted on GitHub (see this article on how to import it in your Workspace).
H-1B is an employment-based, non-immigrant visa category for temporary foreign workers in the United States. For a foreign national to apply for H1-B visa, an US employer must offer a job and petition for H-1B visa with the US immigration department. This is the most common visa status applied for and held by international students once they complete college/ higher education (Masters, PhD) and work in a full-time position.
Kaggle offers data versioning so we didn’t feel we needed to copy the dataset into GitHub. We used Version 2 of the dataset.
The data set is contained in a single CSV file of 0.5GB. It has few columns but a large number of rows (3 million):
We can notice a couple of cosmetic aspects that warrant some light wrangling:
- The ID column simply doesn’t have a name in the CSV file
- The WORKSITE column contains a city and a state with a comma separating them ; this is denormalized data and it can be useful to look at state or city separately (and we will)
- The FULL_TIME_POSITION has values ‘Y’ and ‘N’ ; it should be a boolean
As in past articles, we use Python SDK. We make sure our cluster is configured with credentials of the storage account where we copied the data.
Then, we simply point to the file in the storage account.
# Replace with your container and storage account: "wasbs://<container>@<storage account>.blob.core.windows.net/" pathPrefix = "wasbs://email@example.com/" path = pathPrefix + "h1b_kaggle.csv" # Load CSV df = spark.read.option("header","true").csv(path)
Here we use the spark session to load the data as a Data Frame. The spark context is used to manipulate RDDs while the session is used for Spark SQL. Each interface offer different load methods with the Spark Context offering more high level methods.
Here we load the CSV file as a CSV, interpreting its header row and inferring the schema given the data present in each column.
CSV load works well but we want to rework some columns. For that we’ll flip back to an RDD representation. To pass from a Data Frame df to its RDD representation we can simply use df.rdd.
That RDD will be an RDD of Row (i.e. strong typed). A Row is a read-only object which makes it cumbersome to manipulate as we need to repeat each existing column. An easier way to manipulate it is to use a Python dictionary representation.
So we define the following Python method to manipulate a row:
# Manipulate a data-row to change some columns def reworkRow(row): from pyspark.sql import Row # Let's convert the data row to a dictionary # This is easier to manipulate as the dictionary isn't readonly dict = row.asDict() # Change the first column from _co to id dict['id'] = dict['_c0'] del(dict['_c0']) # Split the WORKSITE column into city & state worksite = dict['WORKSITE'].split(',') city = worksite.strip() if len(worksite)>0 else None state = worksite.strip() if len(worksite)>1 else None dict['CITY'] = city dict['STATE'] = state del(dict['WORKSITE']) # Change FULL_TIME _POSITION column from 'Y' and 'N' to True / False (boolean) dict['FULL_TIME_POSITION'] = True if dict['FULL_TIME_POSITION']=='Y' else False return Row(**dict)
We address the three changes we wanted to address: id column, worksite column & full time position column.
We can then simply do a map on the RDD and recreate a data frame from the mapped RDD:
# Convert back to RDD to manipulate the rows rdd = df.rdd.map(lambda row: reworkRow(row)) # Create a dataframe with the manipulated rows hb1 = spark.createDataFrame(rdd) # Let's cache this bad boy hb1.cache() # Create a temporary view from the data frame hb1.createOrReplaceTempView("hb1")
We cached the data frame. Since the data set is 0.5GB on disk, it is useful to keep it in memory. The .cache method does a best effort job of keeping the data in the RAM of the worker nodes. This means that from one query to the next, the dataframe isn’t fully reconstructed. It does improve the performance drastically as we can easily test by commenting that command out.
We also register the data frame as a temporary view so we can use it in SQL.
This is the extend of wrangling we do with this data set. Nothing very complicated, but as it often is the case, we do more than just load the CSV and query on it.
First, let’s check the size of the data set:
%sql SELECT COUNT(*) FROM hb1
This gives us a count of 3 002 458, which is a good size data set.
We can then check which state are most popular for immigration requests in the USA:
%sql SELECT STATE, COUNT(*) AS petitions FROM hb1 GROUP BY STATE ORDER BY petitions DESC
The result is our first insight:
California being on top isn’t surprising. Texas being second is a little more interesting, although since it is followed by New York & New Jersey, it is questionable if those two aren’t both related to New York City’s attraction power, so together they would be 2nd.
If we breakdown by city, a different immigration portrait appears:
%sql SELECT CITY, STATE, COUNT(*) AS petitions FROM hb1 GROUP BY CITY, STATE ORDER BY petitions DESC
Indeed, New York takes the lion share and Houston comes 2nd. Atlanta comes 4th while the state of Georgia was 9th.
This is a great example of drill down to better understand the data.
We could have a look at the distribution of the case status:
%sql SELECT CASE_STATUS, COUNT(*) AS petitions FROM hb1 GROUP BY CASE_STATUS ORDER BY petitions DESC
A surprisingly low level of rejection.
Let’s look at the employers sponsoring those visas.
%sql SELECT EMPLOYER_NAME, COUNT(*) as count FROM hb1 GROUP BY EMPLOYER_NAME ORDER BY count DESC LIMIT 20
We see that the top of employers is dominated by the IT industry.
Looking at the job title of applicants corroborates the dominance of the IT industry:
%sql SELECT JOB_TITLE, COUNT(*) AS count FROM hb1 GROUP BY JOB_TITLE ORDER BY count DESC LIMIT 20
We could query further to understand how the IT industry structure this data set.
Those were just a few queries using aggregations. Since the underlying data frame is cached, they run very fast: in a few seconds.
We wanted to show how to first load a CSV data set to then pre process it a little to change some of its characteristics.
Here we “normalized” a field, i.e. we split the state from city in two columns. We renamed the id-field. Finally, we re-interpreted the full time position field into a boolean.
In our case we do this with only one file, but we could just as easily have done it on multiple files.
We chose to do those modifications using the RDD interface. We could have done it using the data frame interface or even in SQL. We found it more natural to do it in the RDD.