We looked at Azure Databricks a few weeks ago.
Azure Databricks is a managed Apache Spark Cluster service.
More recently we looked at how to analyze a data set using Resilient Distributed Dataset (RDD). We used the Social characteristics of the Marvel Universe public dataset, replicating some experiments we did 2 years ago with Azure Data Lake.
In this article, we are going to do the same analysis using Spark SQL & Data Frames and see how it addresses RDDs’ shortcomings. Again, we are going to use the Python SDK but also SQL.
It is important to note that about everything in this article isn’t specific to Azure Databricks and would work with any distribution of Apache Spark.
Spark SQL & Data Frames
Spark SQL & Data Frames is well documented on the Apache Spark online documentation. They defined Spark SQL in those words:
“Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.”
- Structured data
- Extra information about data structure
- Extra optimizations using that extra information
With RDD we are able to parse files with little structure and process them. We can process files with structure but as the experiment we’ll do will make amply clear, it is nowhere as efficient as Data Frames with structured Data.
Typically we use RDD to take unstructured data and either get insights directly from it or transform it into a shape that can be used by other components: Spark SQL or Data Warehouses (e.g. Azure SQL Data Warehouse).
Azure Databricks Workspace setup
The setup is the same than in our RDD article.
We still need a workspace, a cluster configured to connect to a storage account containing our data file.
We recommend cloning the notebook used in the previous article since the beginning is very similar.
Loading the data: RDDs
We’ll first load the data. This is identical to our previous article and we use RDDs:
# Fetch porgat.txt from storage account pathPrefix = "wasbs://<span style="display: inline !important; float: none; background-color: transparent; color: #333333; cursor: text; font-family: Georgia,'Times New Roman','Bitstream Charter',Times,serif; font-size: 16px; font-style: normal; font-variant: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: left; text-decoration: none; text-indent: 0px; text-transform: none; -webkit-text-stroke-width: 0px; white-space: normal; word-spacing: 0px;"><CONTAINER>@<STORAGE ACCOUNT></span>/" file = sc.textFile(pathPrefix + "porgat.txt")
To ensure future compatibility, we did copy the dataset in GitHub, along with the notebook.
The placeholders CONTAINER and STORAGE ACCOUNT are for the name of the container where we copied the file and the name of the storage account owning that container.
Then let’s transform the data. Remember that all the three RDDs (i.e. characters, publications & relationships) are all coming from a single file.
# Remove the headers from the file: lines starting with a star noHeaders = file.filter(lambda x: len(x)>0 and x!='*') # Extract a pair from each line: the leading integer and a string for the rest of the line paired = noHeaders.map(lambda l: l.partition(' ')).filter(lambda t: len(t)==3 and len(t)>0 and len(t)>0).map(lambda t: (int(t), t)) # Filter relationships as they do not start with quotes, then split the integer list scatteredRelationships = paired.filter(lambda (charId, text): text!='"').map(lambda (charId, text): (charId, [int(x) for x in text.split(' ')])) # Relationships for the same character id sometime spans more than a line in the file, so let's group them together relationships = scatteredRelationships.reduceByKey(lambda pubList1, pubList2: pubList1 + pubList2) # Filter non-relationships as they start with quotes ; remove the quotes nonRelationships = paired.filter(lambda (index, text): text=='"').map(lambda (index, text): (index, text[1:-1].strip())) # Characters stop at a certain line (part of the initial header ; we hardcode it here) characters = nonRelationships.filter(lambda (charId, name): charId<=6486) # Publications starts after the characters publications = nonRelationships.filter(lambda (charId, name): charId>6486)
Comments explain what each line does.
Transitioning to Spark SQL: Data Frames
Spark SQL work with Data Frames which are a kind of “structured” RDD or an “RDD with schema”.
The integration between the two works by creating a RDD of Row (a type from pyspark.sql) and then creating a Data Frame from it.
The Data Frames can then be registered as views. It is those views we’ll query using Spark SQL.
from pyspark.sql import Row # Let's create dataframes out of the RDDs and register them as temporary views for SQL to use # Relationships has a list as a component, let's flat that flatRelationships = relationships.flatMap(lambda (charId, pubList): [(charId, pubId) for pubId in pubList]) # Let's map the relationships to an RDD of rows in order to create a data frame out of it relationshipsDf = spark.createDataFrame(flatRelationships.map(lambda t: Row(charId=t, pubId=t))) # Register relationships as a temporary view relationshipsDf.createOrReplaceTempView("relationships") # Let's do the same for characters charactersDf = spark.createDataFrame(characters.map(lambda t: Row(charId=t, name=t))) charactersDf.createOrReplaceTempView("characters") # and for publications publicationsDf = spark.createDataFrame(publications.map(lambda t: Row(pubId=t, name=t))) publicationsDf.createOrReplaceTempView("publications")
We could easily come back to a RDD object with, for instance, publicationsDf.rdd.
Querying using Spark SQL
Using our Python Notebook, we’ll now transition to SQL.
We can mix languages in the Notebook, as the online documentation explains, by simply starting a command with %<language>. In this case we start with %sql.
%sql SELECT c1.name AS name1, c2.name AS name2, sub.charId1, sub.charId2, sub.pubCount FROM ( SELECT r1.charId AS charId1, r2.charId AS charId2, COUNT(r1.pubId, r2.pubId) AS pubCount FROM relationships AS r1 CROSS JOIN relationships AS r2 WHERE r1.charId < r2.charId AND r1.pubId=r2.pubId GROUP BY r1.charId, r2.charId ) AS sub INNER JOIN characters c1 ON c1.charId=sub.charId1 INNER JOIN characters c2 ON c2.charId=sub.charId2 ORDER BY sub.pubCount DESC LIMIT 10
Here we see the power of Spark SQL. No longer do we have strange manipulation of RDDs where we flipped the data around to have some item in first position in order to sort or group it. We have plain SQL playing with the data in a very natural manner.
It is also extremely fast compare to the RDD code from our last article performing the same work and giving the same results.
That code ranks Marvel characters in duo in order of join-appearances in publications. Here is the code ranking trios:
%sql SELECT c1.name AS name1, c2.name AS name2, c3.name AS name3, sub.charId1, sub.charId2, sub.charId3, sub.pubCount FROM ( SELECT r1.charId AS charId1, r2.charId AS charId2, r3.charId AS charId3, COUNT(r1.pubId, r2.pubId, r3.pubId) AS pubCount FROM relationships AS r1 CROSS JOIN relationships AS r2 CROSS JOIN relationships AS r3 WHERE r1.charId < r2.charId AND r2.charId < r3.charId AND r1.pubId=r2.pubId AND r2.pubId=r3.pubId GROUP BY r1.charId, r2.charId, r3.charId ) AS sub INNER JOIN characters c1 ON c1.charId=sub.charId1 INNER JOIN characters c2 ON c2.charId=sub.charId2 INNER JOIN characters c3 ON c3.charId=sub.charId3 ORDER BY sub.pubCount DESC LIMIT 10
Not much more complex nor longer to execute.
We didn’t do that exercise with the RDD because it looked extremely tedious to do. This is a good indicator of the power of a tool where you can do things that seemed “way too complex” with another one.
On the surface Spark SQL looks like semantic sugar on top of RDD.
We hope the example we gave convinced you otherwise. Spark SQL is more concise, easy to read and faster to run by an order of magnitude.
That being said, it doesn’t mean we shouldn’t use RDDs. RDDs have their place with unstructured data or data preparation. Spark SQL isn’t useful in those cases.
We barely scratched the surface here. For instance, Spark SQL can deal with hierarchical data (e.g. JSON) which didn’t show here.