/ Spark

Apache Spark: An Introduction

Overview

After lots of ground-breaking work led by the UC Berkeley AMP Lab, Apache Spark was developed to utilize distributed, in-memory data structures to improve data processing speeds over Hadoop for most workloads. In this post, we're going to cover the architecture of Spark and basic transformations and actions using a real dataset. If you want to write and run your own Spark code, check out the interactive version of this post on Dataquest.

Resilient Distributed Datasets (RDD's)

The core data structure in Spark is an RDD, or a resilient distributed dataset. As the name suggests, an RDD is Spark's representation of a dataset that is distributed across the RAM, or memory, of lots of machines. An RDD object is essentially a collection of elements that you can use to hold lists of tuples, dictionaries, lists, etc. Similar to DataFrames in Pandas, you load a dataset into an RDD and then can run any of the methods accesible to that object.

PySpark - Apache Spark in Python

While Spark is writen in Scala, a language that compiles down to bytecode for the JVM, the open source community has developed a wonderful toolkit called PySpark that allows you to interface with RDD's in Python. Thanks to a library called Py4J, Python can interface with JVM objects, in our case RDD's, and this library one of the tools that makes PySpark work.

To start off, we'll load the dataset containing all of the Daily Show guests into an RDD. We are using the TSV version of FiveThirtyEight's dataset. TSV files are separated, or delimited, by a tab character "\t" instead of a comma "," like in a CSV file.


raw_data = sc.textFile("daily_show.tsv")
raw_data.take(5)

['YEAR\tGoogleKnowlege_Occupation\tShow\tGroup\tRaw_Guest_List',
 '1999\tactor\t1/11/99\tActing\tMichael J. Fox',
 '1999\tComedian\t1/12/99\tComedy\tSandra Bernhard',
 '1999\ttelevision actress\t1/13/99\tActing\tTracey Ullman',
 '1999\tfilm actress\t1/14/99\tActing\tGillian Anderson']

SparkContext

SparkContext is the object that manages the connection to the clusters in Spark and coordinates running processes on the clusters themselves. SparkContext connects to cluster managers, which manage the actual executors that run the specific computations. Here's a diagram from the Spark documentation to better visualize the architecture:

cluster-overview

The SparkContext object is usually referenced as the variable sc. We then run:

raw_data = sc.textFile("daily_show.tsv")

to read the TSV dataset into an RDD object raw_data. The RDD object raw_data closely resembles a List of String objects, one object for each line in the dataset. We then use the take() method to print the first 5 elements of the RDD:

raw_data.take(5)

To explore the other methods an RDD object has access to, check out the PySpark documentation. take(n) will return the first n elements of the RDD.

Lazy Evaluation

One question you may have is if an RDD resembles a Python List, why not just use bracket notation to access elements in the RDD? Because RDD objects are distributed across lots of partitions, we can't rely on the standard implementation of a List and the RDD object was developed to specifically handle the distributed nature of the data. One advantage of the RDD abstraction is the ability to run Spark locally on your own computer. When running locally on your own computer, Spark simulates distributing your calculations over lots of machines by slicing your computer's memory into partitions, with no tweaking or changes to the code you wrote.

Another advantage of Spark's RDD implementation is the ability to lazily evaluate code, postponing running a calculation until absolutely necessary. In the code above, Spark didn't wait to load the TSV file into an RDD until raw_data.take(5) was run. When raw_data = sc.textFile("dail_show.tsv") was called, a pointer to the file was created, but only when raw_data.take(5) needed the file to run its logic was the text file actually read into raw_data. We will see more examples of this lazy evaluation in this lesson and in future lessons.

Pipelines

Spark borrowed heavily from Hadoop's Map-Reduce pattern, but is quite different in many ways. If you have experience with Hadoop and traditional Map-Reduce, read this great post by Cloudera on the difference. Don't worry if you have never worked with Map-Reduce or Hadoop before as we'll cover the concepts you need to know in this course.

The key idea to understand when working with Spark is data pipelining. Every operation or calculation in Spark is essentially a series of steps that can be chained together and run in succession to form a pipeline. Each step in the pipeline returns either a Python value (e.g. Integer), a Python data structure (e.g. Dictionary) or an RDD object. We'll first start with the map() function.

Map()

The map(f) function applies the function f to every element in the RDD. Since RDD's are iterable objects, like most Python objects, Spark runs function f on every iteration and returns a new RDD.

We'll walk through a map example so you can get a better sense. If you look carefully, raw_data is in a format that's hard to work with. While currently each element is a String, we'd like to transform every element into a List so the data is more managable. While traditionally, we would:

1. Use a `for` loop to iterate over the collection
2. Split each `String` on the delimiter
3. Store the result in a `List`

let's walk through how we use map to do this in Spark.

In the below code block, we:

1. Call the RDD function `map()` to specify we want the enclosed logic to be applied to every line in our dataset
2. Write a lambda function to split each line using the tab delimiter "\t" and assign the resulting RDD to `daily_show`
3. Call the RDD function `take()` on `daily_show` to display the first 5 elements (or rows) of the resulting RDD

The map(f) function is known as a transformation step and either a named or lambda function f is required.


daily_show = raw_data.map(lambda line: line.split('\t'))
daily_show.take(5)

[['YEAR', 'GoogleKnowlege_Occupation', 'Show', 'Group', 'Raw_Guest_List'],
 ['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'],
 ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'],
 ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'],
 ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson']]

Python and Scala, friends forever

One of the wonderful features of PySpark is the ability to separate our logic, which we prefer to write in Python, from the actual data transformation. In the above code block, we wrote a lambda function in Python code:

raw_data.map(lambda: line(line.split('\t')))

but got to take advantage of Scala when Spark actually ran the code over our RDD. This is the power of PySpark. Without learning any Scala, we get to harness the data processing performance gains from Spark's Scala architecture. Even better, when we ran:

daily_show.take(5)

the results were returned to us in Python friendly notation.

Transformations and Actions

In Spark, there are two types of methods:

1. Transformations - map(), reduceByKey()
2. Actions - take(), reduce(), saveAsTextFile(), collect()

Transformations are lazy operations and always return a reference to an RDD object. The transformation, however, is not actually run until an action needs to use the resulting RDD from a transformation. Any function that returns an RDD is a transformation and any function that returns a value is an action. These concepts will become more clear as you work through this lesson and practice writing PySpark code.

Immutability

You may be wondering why we couldn't just split each String in place instead of creating a new object daily_show? In Python, we could have modified the collection element-by-element in place without returning and assignign to a new object.

RDD objects are immutable and their values can't be changed once the object is created. In Python, List objects and Dictionary objects are mutable, which means we can change the object's values, while Tuple objects are immutable. The only way to modify a Tuple object in Python is to create a new Tuple object with the necessary updates. Spark utilizes immutability of RDD's for speed gains and the mechanics of that are outside the scope of this lesson.

ReduceByKey()

We would like to get a histogram, or a tally, of the number of guests in each year the show has been running. If daily_show were a List of Lists, we could write the following Python code to achieve this result:

tally = dict()
for line in daily_show:
  year = line[0]
  if year in tally.keys():
    tally[year] = tally[year] + 1
  else:
    tally[year] = 1

The keys in tally will be unique Year values and the values will be the number of lines in the dataset that contained that value.

If we want to achieve this same result using Spark, we will have to use a Map step followed by a ReduceByKey step.


tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
print(tally)

PythonRDD[156] at RDD at PythonRDD.scala:43

Explanation

You may notice that printing tally didn't return the histogram we were hoping for. Because of lazy evaluation, PySpark delayed executing the map and reduceByKey steps until we actually need it. Before we use take() to preview the first few elements in tally, we'll walk through the code we just wrote.

daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x+y)

During the map step, we used a lambda function to create a tuple consisting of:

  • key: x[0], the first value in the List
  • value: 1, the int

Our high level strategy was to create a tuple with the key representing the Year and the value representing 1. After the map step, Spark will maintain in memory a list of tuples resembling the following:

('YEAR', 1)
('1991', 1)
('1991', 1)
('1991', 1)
('1991', 1)
...

and we'd like to reduce that down to:

('YEAR', 1)
('1991', 4)
...

reduceByKey(f) combines tuples with the same key using the function we specify f.

To see the results of these 2 steps, we'll use the take command, which forces lazy code to run immediately. Since tally is an RDD, we can't use Python's len function to know how many elements are in the collection and will instead need to use the RDD count() function.


tally.take(tally.count())

[('YEAR', 1),
 ('2013', 166),
 ('2001', 157),
 ('2004', 164),
 ('2000', 169),
 ('2015', 100),
 ('2010', 165),
 ('2006', 161),
 ('2014', 163),
 ('2003', 166),
 ('2002', 159),
 ('2011', 163),
 ('2012', 164),
 ('2008', 164),
 ('2007', 141),
 ('2005', 162),
 ('1999', 166),
 ('2009', 163)]

Filter

Unlike Pandas, Spark knows nothing about column headers and didn't set them aside. We need a way to get rid of the element:

('YEAR', 1)

from our collection. While you may be tempted to try to find a way to remove this element from the RDD, recall that RDD objects are immutable and can't be changed once created. The only way to remove that tuple is to create a new RDD object without that tuple.

Spark comes with a function filter(f) that allows us to create a new RDD from an existing one containing only the elements meeting our criteria. Specify a function f that returns a binary value, True or False, and the resulting RDD will consist of elements where the function evaluated to True. Read more about the filter function over at Spark's documentation.


def filter_year(line):
    if line[0] == 'YEAR':
        return False
    else:
        return True
filtered_daily_show = daily_show.filter(lambda line: filter_year(line))

All together now

To flex Spark's muscles, we'll demonstrate how to chain together a series of data transformations into a pipeline and observe Spark managing everything in the background. Spark was written with this functionality in mind and is highly optimized for running tasks in succession. Previously, running lots of tasks in succession in Hadoop was incredibly time consuming since intermediate results needed to be written to disk and Hadoop wasn't aware of the full pipeline (optional reading if you're curious: http://qr.ae/RHWrT2).

Thanks to Spark's aggressive usage of memory (and only disk as a backup and for specific tasks) and well architected core, Spark is able to improve significantly on Hadoop's turnaround time. In the following code block, we'll filter out actors with no profession listed, lowercase each profession, generate a histogram of professions, and output the first 5 tuples in the histogram.


filtered_daily_show.filter(lambda line: line[1] != '') \
                   .map(lambda line: (line[1].lower(), 1)) \
                   .reduceByKey(lambda x,y: x+y) \
                   .take(5)

[('radio personality', 3),
 ('television writer', 1),
 ('american political figure', 2),
 ('former united states secretary of state', 6),
 ('mathematician', 1)]

Next steps

We hope that in this lesson, we have whet your appetite for Apache Spark and how we can use PySpark to write Python code we're familiar with but still take advantage of distributed processing. When working with larger datasets, PySpark really shines since it blurs the line between doing data science locally on your own computer and doing data science using large amounts of distributed computing on the internet (also referred to as the cloud).

If you enjoyed this post, check out our Spark Course on Dataquest where we learn more about transformations & actions in Spark.

Srini Kadamati

Srini Kadamati

Director of Content at Dataquest.io. Based in Austin, TX.

Read More