Spark Examples

Spark is built around distributed datasets that support types of parallel operations: transformations, which are lazy and yield another distributed dataset (e.g., map, filter, and join), and actions, which force the computation of a dataset and return a result (e.g., count). The following examples show off some of the available operations and features. Several additional examples are distributed with Spark, both for core Spark (Scala examples, Java examples, Python examples) and streaming Spark (Scala examples, Java examples).

Text Search

In this example, we search through the error messages in a log file:

val file = spark.textFile("hdfs://...")
val errors = file.filter(line => line.contains("ERROR"))
// Count all the errors
errors.count()
// Count errors mentioning MySQL
errors.filter(line => line.contains("MySQL")).count()
// Fetch the MySQL errors as an array of strings
errors.filter(line => line.contains("MySQL")).collect()

The red code fragments are Scala function literals (closures) that get passed automatically to the cluster. The blue ones are Spark operations.

In-Memory Text Search

Spark can cache datasets in memory to speed up reuse. In the example above, we can load just the error messages in RAM using:

errors.cache()

After the first action that uses errors, later ones will be much faster.

Word Count

In this example, we use a few more transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.

val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
                  .map(word => (word, 1))
                  .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

Estimating Pi

Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.

val count = spark.parallelize(1 to NUM_SAMPLES).map(i =>
  val x = Math.random
  val y = Math.random
  if (x*x + y*y < 1) 1.0 else 0.0
).reduce(_ + _)
println("Pi is roughly " + 4 * count / NUM_SAMPLES)

Logistic Regression

This is an iterative machine learning algorithm that seeks to find the best hyperplane that separates two sets of points in a multi-dimensional feature space. It can be used to classify messages into spam vs non-spam, for example. Because the algorithm applies the same MapReduce operation repeatedly to the same dataset, it benefits greatly from caching the input data in RAM across iterations.

val points = spark.textFile(...).map(parsePoint).cache()
var w = Vector.random(D) // current separating plane
for (i <- 1 to ITERATIONS) {
  val gradient = points.map(p =>
    (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
  
).reduce(_ + _)
  w -= gradient
}
println("Final separating plane: " + w)

Note that w gets shipped automatically to the cluster with every map call.

The graph below compares the performance of this Spark program against a Hadoop implementation on 30 GB of data on an 80-core cluster, showing the benefit of in-memory caching:

Logistic regression performance in Spark vs Hadoop