--- layout: global title: Examples type: "page singular" navigation: weight: 4 show: true ---

Spark Examples

These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects. You create a dataset from external data, then apply parallel operations to it. There are two types of operations: transformations, which define a new dataset based on previous ones, and actions, which kick off a job to execute on a cluster.

Text Search

In this example, we search through the error messages in a log file:
file = spark.textFile("hdfs://...")
errors = file.filter(lambda line: "ERROR" in line)
# Count all the errors
errors.count()
# Count errors mentioning MySQL
errors.filter(lambda line: "MySQL" in line).count()
# Fetch the MySQL errors as an array of strings
errors.filter(lambda line: "MySQL" in line).collect()
val file = spark.textFile("hdfs://...")
val errors = file.filter(line => line.contains("ERROR"))
// Count all the errors
errors.count()
// Count errors mentioning MySQL
errors.filter(line => line.contains("MySQL")).count()
// Fetch the MySQL errors as an array of strings
errors.filter(line => line.contains("MySQL")).collect()
JavaRDD<String> file = spark.textFile("hdfs://...");
JavaRDD<String> errors = file.filter(new Function<String, Boolean>() {
  public Boolean call(String s) { return s.contains("ERROR"); }
}
);
// Count all the errors
errors.count();
// Count errors mentioning MySQL
errors.filter(new Function<String, Boolean>() {
  public Boolean call(String s) { return s.contains("MySQL"); }
}
).count();
// Fetch the MySQL errors as an array of strings
errors.filter(new Function<String, Boolean>() {
  public Boolean call(String s) { return s.contains("MySQL"); }
}
).collect();

The red code fragments are function literals (closures) that get passed automatically to the cluster. The blue ones are Spark operations.

In-Memory Text Search

Spark can cache datasets in memory to speed up reuse. In the example above, we can load just the error messages in RAM using:

errors.cache()
errors.cache()
errors.cache();

After the first action that uses errors, later ones will be much faster.

Word Count

In this example, we use a few more transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
JavaRDD<String> file = spark.textFile("hdfs://...");
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
  public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
}
);
JavaPairRDD<String, Integer> pairs = words.map(new PairFunction<String, String, Integer>() {
  public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
}
);
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
}
);
counts.saveAsTextFile("hdfs://...");

Estimating Pi

Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.

def sample(p):
    x, y = random(), random()
    return 1 if x*x + y*y < 1 else 0

count = spark.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
             .reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
val count = spark.parallelize(1 to NUM_SAMPLES).map{i =>
  val x = Math.random()
  val y = Math.random()
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)
int count = spark.parallelize(makeRange(1, NUM_SAMPLES)).filter(new Function<Integer, Boolean>() {
  public Integer call(Integer i) {
    double x = Math.random();
    double y = Math.random();
    return x*x + y*y < 1;
  }
}
).count();
System.out.println("Pi is roughly " + 4 * count / NUM_SAMPLES);

Logistic Regression

This is an iterative machine learning algorithm that seeks to find the best hyperplane that separates two sets of points in a multi-dimensional feature space. It can be used to classify messages into spam vs non-spam, for example. Because the algorithm applies the same MapReduce operation repeatedly to the same dataset, it benefits greatly from caching the input in RAM across iterations.

points = spark.textFile(...).map(parsePoint).cache()
w = numpy.random.ranf(size = D) # current separating plane
for i in range(ITERATIONS):
    gradient = points.map(
        lambda p: (1 / (1 + exp(-p.y*(w.dot(p.x)))) - 1) * p.y * p.x
    
).reduce(lambda a, b: a + b)
    w -= gradient
print "Final separating plane: %s" % w
val points = spark.textFile(...).map(parsePoint).cache()
var w = Vector.random(D) // current separating plane
for (i <- 1 to ITERATIONS) {
  val gradient = points.map(p =>
    (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
  
).reduce(_ + _)
  w -= gradient
}
println("Final separating plane: " + w)
class ComputeGradient extends Function<DataPoint, Vector> {
  private Vector w;
  ComputeGradient(Vector w) { this.w = w; }
  public Vector call(DataPoint p) {
    return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1));
  }
}

JavaRDD<DataPoint> points = spark.textFile(...).map(new ParsePoint()).cache();
Vector w = Vector.random(D); // current separating plane
for (int i = 0; i < ITERATIONS; i++) {
  Vector gradient = points.map(new ComputeGradient(w)).reduce(new AddVectors());
  w = w.subtract(gradient);
}
System.out.println("Final separating plane: " + w);

Note that the current separating plane, w, gets shipped automatically to the cluster with every map call.

The graph below compares the running time per iteration of this Spark program against a Hadoop implementation on 100 GB of data on a 100-node cluster, showing the benefit of in-memory caching:

Logistic regression performance in Spark vs Hadoop

Additional Examples

Many additional examples are distributed with Spark: * Basic Spark: [Scala examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples), [Java examples](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples), [Python examples](https://github.com/apache/spark/tree/master/examples/src/main/python) * Spark Streaming: [Scala examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming), [Java examples](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)