Azure Databricks – Spark SQL – Data Frames


pexels-photo-411089 (1)We looked at Azure Databricks a few weeks ago.

Azure Databricks is a managed Apache Spark Cluster service.

More recently we looked at how to analyze a data set using Resilient Distributed Dataset (RDD).  We used the Social characteristics of the Marvel Universe public dataset, replicating some experiments we did 2 years ago with Azure Data Lake.

In this article, we are going to do the same analysis using Spark SQL & Data Frames and see how it addresses RDDs’ shortcomings.  Again, we are going to use the Python SDK but also SQL.

It is important to note that about everything in this article isn’t specific to Azure Databricks and would work with any distribution of Apache Spark.

Spark SQL & Data Frames

Spark SQL & Data Frames is well documented on the Apache Spark online documentation.  They defined Spark SQL in those words:

“Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.”

Keywords here:

  • Structured data
  • Extra information about data structure
  • Extra optimizations using that extra information

With RDD we are able to parse files with little structure and process them.  We can process files with structure but as the experiment we’ll do will make amply clear, it is nowhere as efficient as Data Frames with structured Data.

Typically we use RDD to take unstructured data and either get insights directly from it or transform it into a shape that can be used by other components:  Spark SQL or Data Warehouses (e.g. Azure SQL Data Warehouse).

Azure Databricks Workspace setup

The setup is the same than in our RDD article.

We still need a workspace, a cluster configured to connect to a storage account containing our data file.

We recommend cloning the notebook used in the previous article since the beginning is very similar.

Loading the data:  RDDs

We’ll first load the data.  This is identical to our previous article and we use RDDs:


#  Fetch porgat.txt from storage account
pathPrefix = "wasbs://<span style="display: inline !important; float: none; background-color: transparent; color: #333333; cursor: text; font-family: Georgia,'Times New Roman','Bitstream Charter',Times,serif; font-size: 16px; font-style: normal; font-variant: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: left; text-decoration: none; text-indent: 0px; text-transform: none; -webkit-text-stroke-width: 0px; white-space: normal; word-spacing: 0px;"><CONTAINER>@<STORAGE ACCOUNT></span>/"
file = sc.textFile(pathPrefix + "porgat.txt")

To ensure future compatibility, we did copy the dataset in GitHub, along with the notebook (see this article on how to import it in your Workspace).

The placeholders CONTAINER and STORAGE ACCOUNT are for the name of the container where we copied the file and the name of the storage account owning that container.

Then let’s transform the data.  Remember that all the three RDDs (i.e. characters, publications & relationships) are all coming from a single file.


#  Remove the headers from the file:  lines starting with a star
noHeaders = file.filter(lambda x: len(x)&gt;0 and x[0]!='*')
#  Extract a pair from each line:  the leading integer and a string for the rest of the line
paired = noHeaders.map(lambda l:  l.partition(' ')).filter(lambda t:  len(t)==3 and len(t[0])&gt;0 and len(t[2])&gt;0).map(lambda t: (int(t[0]), t[2]))
#  Filter relationships as they do not start with quotes, then split the integer list
scatteredRelationships = paired.filter(lambda (charId, text):  text[0]!='"').map(lambda (charId, text): (charId, [int(x) for x in text.split(' ')]))
#  Relationships for the same character id sometime spans more than a line in the file, so let's group them together
relationships = scatteredRelationships.reduceByKey(lambda pubList1, pubList2: pubList1 + pubList2)
#  Filter non-relationships as they start with quotes ; remove the quotes
nonRelationships = paired.filter(lambda (index, text):  text[0]=='"').map(lambda (index, text):  (index, text[1:-1].strip()))
#  Characters stop at a certain line (part of the initial header ; we hardcode it here)
characters = nonRelationships.filter(lambda (charId, name): charId&lt;=6486)
#  Publications starts after the characters
publications = nonRelationships.filter(lambda (charId, name): charId&gt;6486)

Comments explain what each line does.

Transitioning to Spark SQL:  Data Frames

Spark SQL work with Data Frames which are a kind of “structured” RDD or an “RDD with schema”.

The integration between the two works by creating a RDD of Row (a type from pyspark.sql) and then creating a Data Frame from it.

The Data Frames can then be registered as views.  It is those views we’ll query using Spark SQL.


from pyspark.sql import Row

#  Let's create dataframes out of the RDDs and register them as temporary views for SQL to use

#  Relationships has a list as a component, let's flat that
flatRelationships = relationships.flatMap(lambda (charId, pubList):  [(charId, pubId) for pubId in pubList])
#  Let's map the relationships to an RDD of rows in order to create a data frame out of it
relationshipsDf = spark.createDataFrame(flatRelationships.map(lambda t: Row(charId=t[0], pubId=t[1])))
#  Register relationships as a temporary view
relationshipsDf.createOrReplaceTempView("relationships")

#  Let's do the same for characters
charactersDf = spark.createDataFrame(characters.map(lambda t:  Row(charId=t[0], name=t[1])))
charactersDf.createOrReplaceTempView("characters")

#  and for publications
publicationsDf = spark.createDataFrame(publications.map(lambda t:  Row(pubId=t[0], name=t[1])))
publicationsDf.createOrReplaceTempView("publications")

We could easily come back to a RDD object with, for instance, publicationsDf.rdd.

Querying using Spark SQL

Using our Python Notebook, we’ll now transition to SQL.

We can mix languages in the Notebook, as the online documentation explains, by simply starting a command with %<language>.  In this case we start with %sql.


%sql

SELECT c1.name AS name1, c2.name AS name2, sub.charId1, sub.charId2, sub.pubCount
FROM
(
SELECT r1.charId AS charId1, r2.charId AS charId2, COUNT(r1.pubId, r2.pubId) AS pubCount
FROM relationships AS r1
CROSS JOIN relationships AS r2
WHERE r1.charId &lt; r2.charId
AND r1.pubId=r2.pubId
GROUP BY r1.charId, r2.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
ORDER BY sub.pubCount DESC
LIMIT 10

Here we see the power of Spark SQL.  No longer do we have strange manipulation of RDDs where we flipped the data around to have some item in first position in order to sort or group it.  We have plain SQL playing with the data in a very natural manner.

It is also extremely fast compare to the RDD code from our last article performing the same work and giving the same results.

That code ranks Marvel characters in duo in order of join-appearances in publications.  Here is the code ranking trios:


%sql

SELECT c1.name AS name1, c2.name AS name2, c3.name AS name3, sub.charId1, sub.charId2, sub.charId3, sub.pubCount
FROM
(
SELECT r1.charId AS charId1, r2.charId AS charId2, r3.charId AS charId3, COUNT(r1.pubId, r2.pubId, r3.pubId) AS pubCount
FROM relationships AS r1
CROSS JOIN relationships AS r2
CROSS JOIN relationships AS r3
WHERE r1.charId &lt; r2.charId
AND r2.charId &lt; r3.charId
AND r1.pubId=r2.pubId
AND r2.pubId=r3.pubId
GROUP BY r1.charId, r2.charId, r3.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
INNER JOIN characters c3 ON c3.charId=sub.charId3
ORDER BY sub.pubCount DESC
LIMIT 10

Not much more complex nor longer to execute.

We didn’t do that exercise with the RDD because it looked extremely tedious to do.  This is a good indicator of the power of a tool where you can do things that seemed “way too complex” with another one.

Summary

On the surface Spark SQL looks like semantic sugar on top of RDD.

We hope the example we gave convinced you otherwise.  Spark SQL is more concise, easy to read and faster to run by an order of magnitude.

That being said, it doesn’t mean we shouldn’t use RDDs.  RDDs have their place with unstructured data or data preparation.  Spark SQL isn’t useful in those cases.

We barely scratched the surface here.  For instance, Spark SQL can deal with hierarchical data (e.g. JSON) which didn’t show here.

Advertisements

6 thoughts on “Azure Databricks – Spark SQL – Data Frames

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s