We looked at Azure Databricks a few weeks ago. Azure Databricks is a managed Apache Spark Cluster service.
In this article, we are going to look at & use a fundamental building block of Apache Spark: Resilient Distributed Dataset or RDD. We are going to use the Python SDK.
It is important to note that about everything in this article isn’t specific to Azure Databricks and would work with any distribution of Apache Spark.
We are going to replicate some experiments we did 2 years ago with Azure Data Lake. We are going to use the Social characteristics of the Marvel Universe public dataset.
Create an Azure Databricks’ workspace
Let’s start by creating an Azure Databricks workspace. This is well covered in the quickstart article of the online documentation, so we won’t repeat that here.
From there we will go in the workspace and create a cluster, which is also covered in the same online documentation’s article.
From there, we will create a notebook, choosing Python language, and attach it to the cluster we just created.
From here on, we’ll deviate from the online documentation quickstart’s article. The reason is the article is using Spark SQL, which we’ll cover in a future article.
Let’s create a storage account and a container within its blob storage. Let’s copy the only data file we are going to use in that container.
Now we need to somehow give access to that container to our cluster.
We do that by configuring the cluster. This is covered in the Azure Storage data source for Azure Databricks article. Please note that the configuration using spark.conf.set(…) wouldn’t work with RDD, so we need to use the on-cluster configuration.
We can either configure access keys or SAS token. For demo / quick learn purposes, access keys are faster to configure. In general, SAS token are more secure since they provide less privilege and are limited in time.
Let’s look at the data.
The file is in three parts:
Relationship between the two
Those three parts are following each other which will make our data retrieval more convoluted than usual. We could have pre-process the file in a word processor beforehand since the entire file is less than a Mb. But it’s a good exercise to work with the file directly since we wouldn’t have that option with multiple big files.
So the first part looks like this:
*Vertices 19428 6486 1 "24-HOUR MAN/EMMANUEL" 2 "3-D MAN/CHARLES CHAN" 3 "4-D MAN/MERCURIO" 4 "8-BALL/" 5 "A" 6 "A'YIN" 7 "ABBOTT, JACK" 8 "ABCISSA" 9 "ABEL"
The header, i.e. first row, starting with an asterisk or star is telling us there are 6486 characters. Indeed, if we find the first occurrence of ‘6486’, we see we go back to letter ‘A’:
6484 "STORMER" 6485 "TIGER WYLDE" 6486 "ZONE" 6487 "AA2 35" 6488 "M/PRM 35" 6489 "M/PRM 36" 6490 "M/PRM 37" 6491 "WI? 9" 6492 "AVF 4"
This is where the publications start.
The header also told us there were 19428 publications. If we fast forward to 19428, we see a new header, Edgeslist:
19427 "AA2 20" 19428 "AA2 38" *Edgeslist 1 6487 2 6488 6489 6490 6491 6492 6493 6494 6495 6496 3 6497 6498 6499 6500 6501 6502 6503 6504 6505 4 6506 6507 6508
Although it looks like every line represents a character ID followed by a list of publication ID, some characters spawn two lines and their character ID is repeated. For instance character 10:
10 6521 6522 6523 6524 6525 6526 6527 6528 6529 6530 6531 6532 6533 6534 6535 10 6536 6537 6538 6539 6540 6541 6542 6543 6544 6545 6546 6547 6548 6549 6550 10 6551 6552 6553 6554 6555 6556 6557 6558 6559 6560 6561 6562 6563 6564 6565
That is about it.
Dataset coming in text file often have bizarre and unique format. Spark has good tools to deal with it.
The only challenge, and this is typical for Big Data tools, is to deal with pieces of files sewed together. Big Data platform typically parallelize the treatment of file and it’s actually difficult and sometimes impossible to get a hold on the row number.
In this case, it’s easier to notice that publication start at a certain ID range (i.e. 6487). This make it easy to filter characters and publications.
Now the relationships use the character IDs, so they start over. We can notice, though, that relationships do not have quotes while both characters and publications do. This is how we’re going to differentiate them.
In the notebook, let’s simply copy this in the first
# Fetch porgat.txt file from storage account pathPrefix = "wasbs://<CONTAINER>@<STORAGE ACCOUNT>.blob.core.windows.net/" file = sc.textFile(pathPrefix + "porgat.txt")
The placeholders CONTAINER and STORAGE ACCOUNT are for the name of the container where we copied the file and the name of the storage account owning that container we created earlier.
“wasbs” stands for Windows Azure Storage Blob. This was created before Windows Azure was rebranded Microsoft Azure. It is an HDFS interface to Azure Blob Storage. This will work because we configure the cluster with access key or SAS token of the blob storage.
The second line doesn’t load the data yet. It is merely a pointing to it.
All the Python-Spark knowledge required to understand the following is in the Apache Spark online documentation of RDD. The documentation is well structured.
First, let’s define the different RDDs we are going to work on. In a new cell of the Notebook, let’s paste:
# Remove the headers from the file: lines starting with a star noHeaders = file.filter(lambda x: len(x)>0 and x!='*') # Extract a pair from each line: the leading integer and a string for the rest of the line paired = noHeaders.map(lambda l: l.partition(' ')).filter(lambda t: len(t)==3 and len(t)>0 and len(t)>0).map(lambda t: (int(t), t)) # Filter relationships as they do not start with quotes, then split the integer list scatteredRelationships = paired.filter(lambda (charId, text): text!='"').map(lambda (charId, text): (charId, [int(x) for x in text.split(' ')])) # Relationships for the same character id sometime spans more than a line in the file, so let's group them together relationships = scatteredRelationships.reduceByKey(lambda pubList1, pubList2: pubList1 + pubList2) # Filter non-relationships as they start with quotes ; remove the quotes nonRelationships = paired.filter(lambda (index, text): text=='"').map(lambda (index, text): (index, text[1:-1].strip())) # Characters stop at a certain line (part of the initial header ; we hardcode it here) characters = nonRelationships.filter(lambda (charId, name): charId<=6486) # Publications starts after the characters publications = nonRelationships.filter(lambda (charId, name): charId>6486)
When we run that, nothing gets computed and it returns quickly. This is because Spark uses lazy-evaluation and we just defined transformations so far.
The comments explain what each line is doing. At the end of the block we have the three RDDs we were looking for in a proper format:
- relationships, where each row is key-value with the character ID as the key and the list of publication IDs as the value
- characters, also key-value with character ID / name
- publications, also key-value with publication ID / name
We can now do some work with those RDDs.
Let’s find out what 2 characters appear more often together in publications.
For this, we’ll take the relationship RDD and perform a Cartesian product on it and then do some filtering to get every character combination possible and the list of publications common to both of them.
Although the relationship dataset isn’t big, performing a Cartesian product will square its size. This will bring the compute requirement where it will take a few seconds to compute.
# Let's find the characters appearing together most often # Let's take the relationship RDD and do a cartesian product with itself all possible duos ; we repartition to be able to scale product = relationships.repartition(100).cartesian(relationships) # Let's then remap it to have the character ids together and intersect their publications (using Python's sets) remapped = product.map(lambda ((charId1, pubList1), (charId2, pubList2)): ((charId1, charId2), list(set(pubList1) & set(pubList2)))) # Let's eliminate doublons noDoublons = remapped.filter(lambda ((charId1, charId2), pubList): charId1<charId2) # Let's remove empty publication list noEmptyPublications = noDoublons.filter(lambda ((charId1, charId2), pubList): len(pubList)>0) # Let's flip the mapping in order to sort by length of publications & drop the publication lists themselves sorted = noEmptyPublications.map(lambda ((charId1, charId2), pubList): (len(pubList), (charId1, charId2))).sortByKey(False) # Action: let's output the first 10 results top10 = sorted.take(10)
The last line is an action. Spark lazy computing gives up and the cluster will finally get a job to run. The job will be divided into tasks deployed into executors on different nodes of the cluster.
Again the comments detail what’s happening on each line.
At the end we crunched a top 10 but we didn’t display yet. This is because those are all IDs which aren’t interesting to look at.
It is going to be much more interesting to look at the name of the characters. For that we will join with the character RDD.
Although we could have join before taking the top 10, that would have meant we would have join on every record and carry the name of the characters around which would have been heavier. Instead, we first take the top 10, then perform the join on it, which is very fast:
# Join once for the first character ; we first need to flip the RDD to have charId1 as the key name1 = sc.parallelize(top10).map(lambda (pubCount, (charId1, charId2)): (charId1, (charId2, pubCount))).join(characters) # Let's perform a similar join on the second character name2 = name1.map(lambda (charId1, ((charId2, pubCount), name1)): (charId2, (name1, charId1, pubCount))).join(characters) # Let's format the RDD a bit formattedTop10 = name2.map(lambda (charId2, ((name1, charId1, pubCount), name2)): (pubCount, (name1, charId1, name2, charId2))) # We need to sort the results again: when we parallelized the top10 it got partitionned and each partition moved independantly formattedTop10.sortByKey(False).collect()
We finally get the answer to our question:
[(744, (u'HUMAN TORCH/JOHNNY S', 2557, u'THING/BENJAMIN J. GR', 5716)), (713, (u'HUMAN TORCH/JOHNNY S', 2557, u'MR. FANTASTIC/REED R', 3805)), (708, (u'MR. FANTASTIC/REED R', 3805, u'THING/BENJAMIN J. GR', 5716)), (701, (u'INVISIBLE WOMAN/SUE', 2650, u'MR. FANTASTIC/REED R', 3805)), (694, (u'HUMAN TORCH/JOHNNY S', 2557, u'INVISIBLE WOMAN/SUE', 2650)), (668, (u'INVISIBLE WOMAN/SUE', 2650, u'THING/BENJAMIN J. GR', 5716)), (616, (u'SPIDER-MAN/PETER PAR', 5306, u'WATSON-PARKER, MARY', 6166)), (526, (u'JAMESON, J. JONAH', 2959, u'SPIDER-MAN/PETER PAR', 5306)), (446, (u'CAPTAIN AMERICA', 859, u'IRON MAN/TONY STARK', 2664)), (422, (u'SCARLET WITCH/WANDA', 4898, u'VISION', 6066))]
It turns out the top 10 is dominated by members of the Fantastic Four, which makes sense since the four of them typically appear together.
Let’s make some observations before wrapping up.
Python Spark allows to easily manipulate big data structures. The RDD abstraction allows us to perform set operations on arbitrarily large data sets in an elegant way.
Compute time is relatively fast given the task at hand.
RDD aren’t typed: each row can have a different format, there are no “column” names, etc. . This makes it easy to start but as the Notebook gets bigger, we need to refer to where an RDD was defined to remember its structure.
RDD abstraction is great but quickly gets cumbersome when we try to perform sorts and join: we then need to remap the RDD to get the right key in place.
We hope this was an easy enough introduction to Python Spark using RDD.
Although RDD are powerful yet simple, we’ve identified two short comings: no-typing and weakness to perform non-trivial queries.
Those shortcomings are actually addressed within the Spark platform with Data Frames & Spark SQL as we’ll see in Future Articles.
Nevertheless, it is useful to understand RDDs as they often are the entry points for some Spark crunching, especially if the data isn’t structured or comes in non-standard files and needs some massaging before being ingested in a Data Frame.
UPDATE (24-01-2018): See Azure Databricks – Spark SQL – Data Frames for details.