Azure Databricks – Parsing escaping CSV files in Spark


pexels-photo-257928In previous weeks, we’ve looked at Azure Databricks, Azure’s managed Spark cluster service.

We then looked at Resilient Distributed Datasets (RDDs) & Spark SQL / Data Frames.  We also looked at an example of more tedious transformation prior to querying using the H-1B Visa Petitions 2011-2016 (from Kaggle) data set.

Here, we’re going to look at some more involved pre-processing using the TED Talks (from Kaggle).  More specifically, we are going to work around Spark limitations in parsing CSV files.

It is important to note that about everything in this article isn’t specific to Azure Databricks and would work with any distribution of Apache Spark.

The notebook used for this article is persisted on GitHub (see this article on how to import it in your Workspace).

The Data

From Kaggle:

These datasets contain information about all audio-video recordings of TED Talks uploaded to the official TED.com website until September 21st, 2017. The TED main dataset contains information about all talks including number of views, number of comments, descriptions, speakers and titles. The TED transcripts dataset contains the transcripts for all talks available on TED.com.

Kaggle offers data versioning so we didn’t feel we needed to copy the dataset into GitHub.  We used Version 3 of the dataset.

We will only work with the ted_main.csv file, which is 7Mb with 2550 rows and 17 columns.

Here are the first 12 columns:

image

and the last 5:

image

The Challenge

A first few observations:

  • There are three columns with collection of complex (JSON) data:  ratings, related_talks & tags.
  • The JSON of complex columns is malformed ; more precisely, it uses single quotes instead of mandatory (according to JSON specs) double quotes ; yet, it does use double quotes for field containing single quote in the field
  • A few columns escape CSV by having commas and quotes within a field
  • Some “row” occupy two lines ; this is usually done within a quote

It basically is a pretty “hard CSV” to parse.  For those reasons, if we use the standard CSV format reader of spark session (i.e. spark.read.csv(path)), we won’t have what we need.

Actually, for some reason, some columns will run over others.  It seems the CSV parser of spark doesn’t fully support the CSV specs at the time of this writing (i.e. mid-January 2018).

We’ll there need to use some more low-level code to get the structure we want.

Wrangling

As in past articles, we use Python SDK.  We make sure our cluster is configured with credentials of the storage account where we copied the data.

Then, we simply point to the file in the storage account.


#  Replace with your container and storage account:  "wasbs://<container>@<storage account>.blob.core.windows.net/"
pathPrefix = "wasbs://ted@vpldb.blob.core.windows.net/"
path = pathPrefix + "ted_main.csv"

We’ll then do the heavy lifting:


import csv
import StringIO

# Load the data as one big string
# We do this because Spark is unable to parse the CSV correctly due to some escaping
text = sc.wholeTextFiles(path).take(1)[0][1]
# Use Python's csv module to parse the content
lines = [v for v in csv.reader(StringIO.StringIO(text.encode('utf8', 'ignore')))]
# Take the first row as column names
columnNames = lines[0]
# Take the rest of the rows as content
content = sc.parallelize(lines[1:])
# Filter out rows that wouldn't have the right number of columns
compliant = content.filter(lambda v: len(v)==len(columnNames))
# Map list-rows to dictionaries using the column names
talkDict = compliant.map(lambda r: dict(zip(columnNames, r)))

Since Spark CSV parser won’t work and that rows run on multiple lines, we load the entire file in memory.

We then use Python’s CSV module to parse the rows.  That module requires a stream, this is why we use the StringIO module to build a stream from a string.

We then have a list (rows) of lists (columns).  The first row contain the column names while the rest is the content.  We parallelize the content to run it on multiple nodes.

We then filter out rows with column count different than the list of column names we have.

We then create a dictionary out of each row by zipping the column names with the row content.

We will then use some Python functions to rework the content.

As mentioned previously, the JSON is malformed and won’t be read by Python’s JSON’s parser.  Fortunately, we can treat JSON as a Python dictionary which we can parse, using the ast module.


def parse(singleQuotedJson):
import ast

return ast.literal_eval(singleQuotedJson)

def reworkFields(d):
# Parse integers since Python's CSV parser only parse strings
d['comments'] = int(d['comments'])
d['duration'] = int(d['duration'])
d['film_date'] = int(d['film_date'])
d['num_speaker'] = int(d['num_speaker'])
d['published_date'] = int(d['published_date'])
d['views'] = int(d['views'])

# Parse json columns (into dictionaries)
d['ratings'] = parse(d['ratings'])
d['related_talks'] = parse(d['related_talks'])
d['tags'] = parse(d['tags'])

return d

def cleanDenormalizedAttributes(dict):
# Remove denormalized properties
del(dict['ratings'])
del(dict['related_talks'])
del(dict['tags'])

return dict

On top of parsing the JSON, the rework function also force some fields to be integer (Python’s CSV parser parses everything in string).


# Rework some fields
cleanFields = talkDict.map(lambda r: reworkFields(r))
# Extract ratings as a separate RDD linked to the talks one with the talk name
ratings = cleanFields.flatMap(lambda d: [{'talkName':d['name'], 'id':r['id'], 'name':r['name'], 'count':r['count']} for r in d['ratings']])
# Extract related talks, similarly linked to talk name
relatedTalks = cleanFields.flatMap(lambda d: [{'talkName':d['name'], 'relatedTalkName':r['title']} for r in d['related_talks']])
# Extract tags, similarly linked to talk name
tags = cleanFields.flatMap(lambda d: [{'talkName':d['name'], 'tag':t} for t in d['tags']])
# Normalize the talkDict by removing denormalized attributes
normalizedTalks = cleanFields.map(lambda d:  cleanDenormalizedAttributes(d))

We first rework the data content.  We then extract the three complex fields as different Resilient Distributed Datasets (RDDs).  We then remove the data used for those 3 new RDDs from the original RDD.

We basically normalized the data.  Spark supports having complex field, so we could have kept everything within one data set.  We did experience difficulties trying that (e.g. the name property in ratings would be null everywhere for some reasons), so we went with the simpler approach of normalization.

We then create data frames from each RDD and register those data frames as temporary view to be used by SQL.

We also cache the data frames so that subsequent queries won’t recalculate them each time.


from pyspark.sql import Row

# Create data frames, cache them and register them as temp views
normalizedTalksDf = spark.createDataFrame(normalizedTalks.map(lambda d: Row(**d)))
normalizedTalksDf.cache()
normalizedTalksDf.createOrReplaceTempView("talks")

ratingsDf = spark.createDataFrame(ratings.map(lambda d: Row(**d)))
ratingsDf.cache()
ratingsDf.createOrReplaceTempView("ratings")

relatedTalksDf = spark.createDataFrame(relatedTalks.map(lambda d: Row(**d)))
relatedTalksDf.cache()
relatedTalksDf.createOrReplaceTempView("relatedTalks")

tagsDf = spark.createDataFrame(tags.map(lambda d: Row(**d)))
tagsDf.cache()
tagsDf.createOrReplaceTempView("tags")

We here have a much meatier pre-processing of the data than we had in previous articles.  This is still simple as the data itself is clean (i.e. there aren’t missing data or malformed fields per se).

Going back to RDD serves us well as we have a much better control to that level.  If the file was too big to be hold in memory, we would have had a challenge.  On the other hand, treating multiple files that way would have been trivial.

Insights

First, let’s check the size of each data frames:


%sql

SELECT
(
SELECT COUNT(*)
FROM talks
) AS talkCount,
(
SELECT COUNT(*)
FROM ratings
) AS ratingTalkCount,
(
SELECT COUNT(*)
FROM relatedTalks
) AS relatedTalkCount,
(
SELECT COUNT(*)
FROM tags
) AS tagCount

This gives use the following cardinalities:

image

So despite the small number of Ted Talks covered, the file does contain a good chunk of data.

We can then ask what is the top ten talks, using the number of views as the success factor.  We could try to see if a talk that is viewed a lot generate a lot of comments ; for this, we’ll compare the number of comments with the number of view:


%sql

SELECT title, main_speaker, views, ROUND(1000000*comments/views, 1) AS commentsPerMillionViews
FROM talks
ORDER BY views DESC
LIMIT 10

image

In general, it seems that well viewed talks generate a lot of comments.

Let’s look at tags and ask which tags are associated to popular talks:


%sql

SELECT ROUND(AVG(t.views)) as avgViews, tg.tag
FROM talks AS t
INNER JOIN tags tg ON tg.talkName=t.name
GROUP BY tg.tag
ORDER BY avgViews DESC

image

Apparently body language is the way to go to be popular at TED.

Let’s then check the ratings.  Ratings are a little more complicated to analyze since for one specific talk there will be multiple ratings of different categories.

Let’s first look at the ratings in order of ratings’ number (not talks view, but the number of time the same category was given).


%sql

SELECT name, SUM(count) AS ratingCount
FROM ratings
GROUP BY name
ORDER BY ratingCount DESC

image

So inspiring speech are rated often.  This is consistent with our experience of TED speeches.

Finally, let’s try to dig a little further in the ratings.  Since Inspiring is the most often rated rating, let’s rank the talks in order of those that received that rating the most often:


%sql

SELECT t.title, t.main_speaker, t.views, r.count
FROM talks AS t
INNER JOIN ratings AS r ON r.talkName = t.name AND r.name="Inspiring"
ORDER BY r.count DESC

image

We see that in general, the Inspiring rating is a good predictor of a talk’s popularity but that it doesn’t reproduce the entire top 10 in the right order (e.g. the fourth talk here was second in terms of views).

Summary

We went a bit further in terms of preprocessing of files using Python and RDDs.

It shows a great power of Spark, one we appreciate in good Framework:  easy cases are easy because you treat them in the highest abstraction level ; but harder cases are just as hard as they need to be.  We didn’t need to work around the Spark abstractions:  we simply went a level down (i.e. RDD), solve the problem and popped back up to the data frame abstraction.

Advertisements

3 thoughts on “Azure Databricks – Parsing escaping CSV files in Spark

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s