last hacked on Jul 23, 2017

# Apache Spark ## Open-Source cluster-computing framework "*Apache Spark is an open-source, distributed processing system commonly used for big data workloads. Apache Spark utilizes in-memory caching and optimized execution for fast performance, and it supports general batch processing, streaming analytics, machine learning, graph databases, and ad hoc queries.*" - AWS article *Apache Spark on Amazon EMR* *Spark* was built as a response to *Hadoop*, which was the standard cluster-computing framework. *Hadoop* used the *MapReduce* concept, but people saw many drawbacks in the *MapReduce* work flow. Since *Spark* utilizes *Hadoop* clusters it does help to understand some concepts, since there is overlap in technologies used. ## Context: What is MapReduce? + Programming model and associated implementation for processing and generating big data sets with parrallel, distributed algorithms on a *Hadoop*-cluster. + Composed of a two important functionalities: + A `Map()` method which performs filtering, and sorting. + A `Reduce()` method that performs the summary operations. + Specializtion of the *split-apply-combine* methodology in data analysis. + Key features of *MapReduce* is scalability and fault-tolerance done for a variety of applications by optimizing the execution engine. + Processing can occur on data stored as *filesysme*, unstructured, or database, structured. + "Map" step: Worker nodes apply `map()` function to the local data, writes output to a temporary storage. A master node ensures that only one copy of redundant input data is processed. + "Shuffle" step: Workers nodes redistribute data based on output keys (produced by the `map()` function), such that all data belonging to one key is located on the same worker node. + "Reduce" step: Worker nodes process each group of output data, per key, in parallel. + Parallelism offers some possibility of recovering partial failure of servers during operation; if mapper or reducer fails, work can be rescheduled. ## Shortcomings of MapReduce + Forces your pipeline into a linear MapReduce dataflow, so it fails to integrate other workflows like join, filter, map-reduce-map. + Implementation through *Java* **API** would result on verbose and hard-to-maintain code + *MapReduce* execution not ideal for iterative algorithms within *Machine Learning* i.e. *Neural Networks*, *Random Forests* ## Spark Data Storage Structures For this section we will be breaking down the 3 main forms of data storage: + *Resilient Distributed Datasets* (**RDD**) + **Spark SQL** (we will refer to this as just **SQL**) + **DataFrames** (similar to those in **R** and **Python**) ## Resilient Distributed Datasets *Spark*'s main data storage structure are Resilient Distributed Datasets, or **RDD**'s which is a collection of elements that can be operated in parallel. A simple example of a distributed dataset by the documentation is as follows, utilizing `SparkContext`'s `parallelize` method. >>> data = [1, 2, 3, 4, 5] >>> distData = sc.parallelize(data) >>> distData.collect() [1, 2, 3, 4, 5] >>> distData.reduce(lambda a, b: a + b) 15 ### Manually setting Partitions An important parameter in the `parrallelize` method is # of *partitions* you would like to cut the dataset into. The documentation recommends 2-4 partitions for each **CPU** in the cluster. *Spark* will set the partitions automatically, but you can set it manually by setting it in the `parallelize` method as follows: >>> sc.parallelize(data, 10) ## Reading External Datasets **RDD**'s can be created from a plethora of outside data sources, Text files can be created using `SparkContext`'s `textFile` method. So analagous to `read.table` in **R** or 'read_table' in **Python**. Example of reading in a text file is as follows: >>> distFile = sc.textFile('data.txt') Important notes given by documentation: + For paths on local filesystem, the file must also be accessible at the same path on worker nodes. + `textFile` can open compressed files, and utilizes wildcards as well. So `textFile("/myProjects/*.txt")` and `textFile("myProjects/*.gz")` are valid syntax + `textFile` can take an optional second parameter which will control the number of partitions. By default, 1 partition for each block (block is 128MB in **HDFS**) ## Saving and Loading SequenceFiles >>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) >>> rdd.saveAsSequenceFile("test.txt") >>> sorted(sc.sequenceFile("test.txt").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')] # RDD Operations **RDD** has two types of operations: *transformations* and *actions*. + Transformations: creates a new dataset from an existing one. The `map` method is an example of a transformation + Actions: Returns a vlaue after running a computation on the dataset. *Transformations* are *lazy*, so they do not compute their results right away. They only compute when an action requires a result to be returned, making *Spark* run more efficiently. ### Basics To illustrate functionalities let's give a simple example >>> lines = sc.textFile("data.txt") >>> lineLengths = lines.map(lambda s: len(s)) >>> totalLength = lineLengths.reduce(lambda a, b: a + b) Let's look at what we're doing line by line: + First we are loading an external file using the `textFile` method + The dataset is not being loaded, lines is a pointer to the file + The second line defines `lineLengths` as the result of a `map` tranformation, in this case the length of the *strings* in our data file + `lineLengths` is *not* computed because of *laziness* + Lastly we `reduce`, an action, here *Spark* breaks the computation into tasks to run on separate machines, and each machine does the respective *map* and *reduce* functionality. So if we wanted to use `lineLengths` again we would add the following: >>> lineLengths.persist() before we utilized `reduce`, which would make `lineLenghts` to be saved in memory. ## Passing Functions to Spark *Spark*'s **API** relies on passing functions in the driver program to run on your clusters. The documentation recommends doing three ways: + Lambda expressions (common in **Python**) simple functions that can be written as an expression + Doing `def`'s inside the function calling into *Spark* + Top-level functions in a module Here is an example of a verbose function that could be written in a `lambda` function >>> def myFunc(s): ... words = s.split(" ") ... return len(words) ... >>> sc = SparkContext() >>> sc.textFile("data.txt").map(myFunc) ### Printing Elements On a single machine the following syntax would generate the desired result (printing out the elements in the **RDD**) `rdd.map(println)` or `rdd.foreach(println)` However when utilizing clusters, the previous syntax wouldn't work. However the following `rdd.collect().foreach(println)` is problematic in that it would cause the driver to run out of memory because `collect()` gets the entire **RDD** into one machine. A safer approach would be: >>> rdd.take(100).foreach(println) ### Key-Value Pairs Most *Spark* operations work on **RDD**'s containing any type of objects, there are special operations only available on **RDD**'s of key-value pairs. Most common are distributed "shuffle" operations, like grouping or aggregating the elements by a key. The following example provided by the documentation uses the `reduceByKey` operation on key-value pairs to count how many times each line of text occurs: >>> lines = sc.textFile("data.txt") >>> pairs = lines.map(lambda s: (s, 1)) >>> counts = pairs.reduceByKey(lambda a, b: a + b) ## RDD Persistence An important capability in *Spark* is *persisting* (or *caching*) a dataset in memory across operations. When persisting each node stores any partitions and reuses them in other actions on that dataset. Allowing future actions to be much faster, *caching* is an key tool when doing iterative algorithms. Marking an **RDD** to be persisted can be done using the `persist()` or `cache()` method. So for the first time it is computed in an action, it will be kept in memory on the nodes. *Spark*'s cache is fault-tolerant - if any partition is lost, it will be recomputed using the transformation that originally created it. More technical detail can be found [here](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) since this is a more advanced topic, it's better left to the pros to explain if you need more context. ### Manually Removing Data *Spark* monitors cache usage on each node and drops out old partitions in *least-recently-used* (**LRU**) fashion. *Least-recently-Used* is an algorithm that throws out data that was, as the name suggests, the least recently used. An example provided by [this website](http://mcicpc.cs.atu.edu/archives/2012/mcpc2012/lru/lru.html) is as follows: Say we have a cache that can hold up to five data pieces. So we access A, B, and C, once we do so they are stored in the cache with two empty spots. Next we access E and F so now they are stored as well with all spots being taken up. Suppose we access A again since A is cached the cache does not change, but it does count as a use so A is the most recently used data. Now if we wanted to cache F, up to here B has been the least recently used so that gets thrown out. <img src="http://mcicpc.cs.atu.edu/archives/2012/mcpc2012/lru/cache1.png"> <p style='text-align: center;'>First cache where we selected A, B, C to be accessed</p> <img src="http://mcicpc.cs.atu.edu/archives/2012/mcpc2012/lru/cache2.png"> <p style='text-align: center;'>The action done after we try to cache F, once we did the other operations. Examples for this section provided by the [Mid-Central USA Programming Contest](http://www.icpc-midcentral.us/index.html)</p> **However**, if you wanted to manually remove an **RDD** you can do the following >>> rdd.unpersist() # Conclusions So that concludes a quick crash course on basic **RDD** operations and getting familiar with how *Spark* operates. The next section will go over more data analysis. The series will most likely have two more sections pertaining to *basic data analysis* and *machine learning* applications. If you have any questions, suggestions, or contributions please feel free to let me know.


keep exploring!

back to all projects