---
layout: global
title: Examples
type: "page singular"
navigation:
weight: 4
show: true
---
Spark Examples
These examples give a quick overview of the Spark API.
Spark is built on the concept of distributed datasets, which contain arbitrary Java or
Python objects. You create a dataset from external data, then apply parallel operations
to it. There are two types of operations: transformations, which define a new dataset based on
previous ones, and actions, which kick off a job to execute on a cluster.
Text Search
In this example, we search through the error messages in a log file:
text_file = spark.textFile("hdfs://...")
errors = text_file.filter(lambda line: "ERROR" in line)
errors.count()
errors.filter(lambda line: "MySQL" in line).count()
errors.filter(lambda line: "MySQL" in line).collect()
val textFile = spark.textFile("hdfs://...")
val errors = textFile.filter(line => line.contains("ERROR"))
errors.count()
errors.filter(line => line.contains("MySQL")).count()
errors.filter(line => line.contains("MySQL")).collect()
JavaRDD<String> textFile = spark.textFile("hdfs://...");
JavaRDD<String> errors = textFile.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("ERROR"); }
});
errors.count();
errors.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("MySQL"); }
}).count();
errors.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("MySQL"); }
}).collect();
The red code fragments are function literals (closures) that get passed automatically to the cluster. The blue ones are Spark operations.
In-Memory Text Search
Spark can cache datasets in memory to speed up reuse. In the example above, we can load just the error messages in RAM using:
After the first action that uses errors
, later ones will be much faster.
Word Count
In this example, we use a few more transformations to build a dataset of (String, Int) pairs called counts
and then save it to a file.
text_file = spark.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
val textFile = spark.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
JavaRDD<String> textFile = spark.textFile("hdfs://...");
JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
counts.saveAsTextFile("hdfs://...");
Estimating Pi
Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.
def sample(p):
x, y = random(), random()
return 1 if x*x + y*y < 1 else 0
count = spark.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
val count = spark.parallelize(1 to NUM_SAMPLES).map{i =>
val x = Math.random()
val y = Math.random()
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)
int count = spark.parallelize(makeRange(1, NUM_SAMPLES)).filter(new Function<Integer, Boolean>() {
public Boolean call(Integer i) {
double x = Math.random();
double y = Math.random();
return x*x + y*y < 1;
}
}).count();
System.out.println("Pi is roughly " + 4 * count / NUM_SAMPLES);
Logistic Regression
This is an iterative machine learning algorithm that seeks to find the best hyperplane that separates two sets of points in a multi-dimensional feature space. It can be used to classify messages into spam vs non-spam, for example. Because the algorithm applies the same MapReduce operation repeatedly to the same dataset, it benefits greatly from caching the input in RAM across iterations.
points = spark.textFile(...).map(parsePoint).cache()
w = numpy.random.ranf(size = D)
for i in range(ITERATIONS):
gradient = points.map(
lambda p: (1 / (1 + exp(-p.y*(w.dot(p.x)))) - 1) * p.y * p.x
).reduce(lambda a, b: a + b)
w -= gradient
print "Final separating plane: %s" % w
val points = spark.textFile(...).map(parsePoint).cache()
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) {
val gradient = points.map(p =>
(1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
).reduce(_ + _)
w -= gradient
}
println("Final separating plane: " + w)
class ComputeGradient extends Function<DataPoint, Vector> {
private Vector w;
ComputeGradient(Vector w) { this.w = w; }
public Vector call(DataPoint p) {
return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1));
}
}
JavaRDD<DataPoint> points = spark.textFile(...).map(new ParsePoint()).cache();
Vector w = Vector.random(D);
for (int i = 0; i < ITERATIONS; i++) {
Vector gradient = points.map(new ComputeGradient(w)).reduce(new AddVectors());
w = w.subtract(gradient);
}
System.out.println("Final separating plane: " + w);
Note that the current separating plane, w
, gets shipped automatically to the cluster with every map
call.
The graph below compares the running time per iteration of this Spark program against a Hadoop implementation on 100 GB of data on a 100-node cluster, showing the benefit of in-memory caching:
Additional Examples
Many additional examples are distributed with Spark:
* Basic Spark: [Scala examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples), [Java examples](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples), [Python examples](https://github.com/apache/spark/tree/master/examples/src/main/python)
* Spark Streaming: [Scala examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming), [Java examples](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)