From 60c97ec11599b2339cea8bd0e005d29bfe8f5d6c Mon Sep 17 00:00:00 2001 From: Matei Alexandru Zaharia Date: Sat, 3 May 2014 21:55:05 +0000 Subject: show Python examples first --- examples.md | 124 ++++++++++++++++++++++++++--------------------------- index.md | 8 ++-- site/examples.html | 124 ++++++++++++++++++++++++++--------------------------- site/index.html | 8 ++-- 4 files changed, 132 insertions(+), 132 deletions(-) diff --git a/examples.md b/examples.md index 71156ef80..ec381c124 100644 --- a/examples.md +++ b/examples.md @@ -19,12 +19,24 @@ previous ones, and actions, which kick off a job to execute on a cluste In this example, we search through the error messages in a log file:
-
+
+
+ file = spark.textFile("hdfs://...")
+ errors = file.filter(lambda line: "ERROR" in line)
+ # Count all the errors
+ errors.count()
+ # Count errors mentioning MySQL
+ errors.filter(lambda line: "MySQL" in line).count()
+ # Fetch the MySQL errors as an array of strings
+ errors.filter(lambda line: "MySQL" in line).collect()
+
+
+
val file = spark.textFile("hdfs://...")
val errors = file.filter(line => line.contains("ERROR"))
@@ -54,18 +66,6 @@ In this example, we search through the error messages in a log file: }).collect();
-
-
- file = spark.textFile("hdfs://...")
- errors = file.filter(lambda line: "ERROR" in line)
- # Count all the errors
- errors.count()
- # Count errors mentioning MySQL
- errors.filter(lambda line: "MySQL" in line).count()
- # Fetch the MySQL errors as an array of strings
- errors.filter(lambda line: "MySQL" in line).collect()
-
-

The red code fragments are function literals (closures) that get passed automatically to the cluster. The blue ones are Spark operations.

@@ -75,24 +75,24 @@ In this example, we search through the error messages in a log file:

Spark can cache datasets in memory to speed up reuse. In the example above, we can load just the error messages in RAM using:

-
+
errors.cache()
-
+
- errors.cache(); + errors.cache()
-
+
- errors.cache() + errors.cache();
@@ -105,12 +105,21 @@ In this example, we search through the error messages in a log file:

In this example, we use a few more transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.

-
+
+
+ file = spark.textFile("hdfs://...")
+ counts = file.flatMap(lambda line: line.split(" ")) \
+              .map(lambda word: (word, 1)) \
+              .reduceByKey(lambda a, b: a + b)
+ counts.saveAsTextFile("hdfs://...") +
+
+
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
@@ -134,15 +143,6 @@ In this example, we search through the error messages in a log file: counts.saveAsTextFile("hdfs://...");
-
-
- file = spark.textFile("hdfs://...")
- counts = file.flatMap(lambda line: line.split(" ")) \
-              .map(lambda word: (word, 1)) \
-              .reduceByKey(lambda a, b: a + b)
- counts.saveAsTextFile("hdfs://...") -
-

Estimating Pi

@@ -150,12 +150,22 @@ In this example, we search through the error messages in a log file:

Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.

-
+
+
+ def sample(p):
+     x, y = random(), random()
+     return 1 if x*x + y*y < 1 else 0

+ count = spark.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
+              .reduce(lambda a, b: a + b)
+ print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
+
+
+
val count = spark.parallelize(1 to NUM_SAMPLES).map(i =>
  val x = Math.random()
@@ -177,16 +187,6 @@ In this example, we search through the error messages in a log file: System.out.println("Pi is roughly " + 4 * count / NUM_SAMPLES);
-
-
- def sample(p):
-     x, y = random(), random()
-     return 1 if x*x + y*y < 1 else 0

- count = spark.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
-              .reduce(lambda a, b: a + b)
- print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
-
-

Logistic Regression

@@ -194,12 +194,24 @@ In this example, we search through the error messages in a log file:

This is an iterative machine learning algorithm that seeks to find the best hyperplane that separates two sets of points in a multi-dimensional feature space. It can be used to classify messages into spam vs non-spam, for example. Because the algorithm applies the same MapReduce operation repeatedly to the same dataset, it benefits greatly from caching the input in RAM across iterations.

-
+
+
+ points = spark.textFile(...).map(parsePoint).cache()
+ w = numpy.random.ranf(size = D) # current separating plane
+ for i in range(ITERATIONS):
+     gradient = points.map(
+         lambda p: (1 / (1 + exp(-p.y*(w.dot(p.x)))) - 1) * p.y * p.x
+     
).reduce(lambda a, b: a + b)
+     w -= gradient
+ print "Final separating plane: %s" % w
+
+
+
val points = spark.textFile(...).map(parsePoint).cache()
var w = Vector.random(D) // current separating plane
@@ -231,18 +243,6 @@ In this example, we search through the error messages in a log file: System.out.println("Final separating plane: " + w);
-
-
- points = spark.textFile(...).map(parsePoint).cache()
- w = numpy.random.ranf(size = D) # current separating plane
- for i in range(ITERATIONS):
-     gradient = points.map(
-         lambda p: (1 / (1 + exp(-p.y*(w.dot(p.x)))) - 1) * p.y * p.x
-     
).reduce(lambda a, b: a + b)
-     w -= gradient
- print "Final separating plane: %s" % w
-
-

Note that the current separating plane, w, gets shipped automatically to the cluster with every map call.

diff --git a/index.md b/index.md index 7f0b21cf9..109e20ca0 100644 --- a/index.md +++ b/index.md @@ -54,11 +54,11 @@ navigation:
file = spark.textFile("hdfs://...")
 
- file.flatMap(line => line.split(" "))
-     .map(word => (word, 1))
-     .reduceByKey(_ + _) + file.flatMap(lambda line: line.split())
+     .map(lambda word: (word, 1))
+     .reduceByKey(lambda a, b: a+b)
-
Word count in Spark
+
Word count in Spark's Python API