From 170e451fbdd308ae77065bd9c0f2bd278abf0cb7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 1 Jan 2013 13:52:14 -0800 Subject: Minor documentation and style fixes for PySpark. --- .../scala/spark/api/python/PythonPartitioner.scala | 4 +- .../main/scala/spark/api/python/PythonRDD.scala | 43 +++++++++++----- docs/index.md | 8 ++- docs/python-programming-guide.md | 3 +- pyspark/examples/kmeans.py | 13 +++-- pyspark/examples/logistic_regression.py | 57 ++++++++++++++++++++++ pyspark/examples/lr.py | 57 ---------------------- pyspark/examples/pi.py | 5 +- pyspark/examples/tc.py | 49 ------------------- pyspark/examples/transitive_closure.py | 50 +++++++++++++++++++ pyspark/examples/wordcount.py | 4 +- pyspark/pyspark/__init__.py | 13 ++++- 12 files changed, 172 insertions(+), 134 deletions(-) create mode 100755 pyspark/examples/logistic_regression.py delete mode 100755 pyspark/examples/lr.py delete mode 100644 pyspark/examples/tc.py create mode 100644 pyspark/examples/transitive_closure.py diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala index 2c829508e5..648d9402b0 100644 --- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala @@ -17,9 +17,9 @@ private[spark] class PythonPartitioner(override val numPartitions: Int) extends val hashCode = { if (key.isInstanceOf[Array[Byte]]) { Arrays.hashCode(key.asInstanceOf[Array[Byte]]) - } - else + } else { key.hashCode() + } } val mod = hashCode % numPartitions if (mod < 0) { diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index dc48378fdc..19a039e330 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -13,8 +13,12 @@ import spark.rdd.PipedRDD private[spark] class PythonRDD[T: ClassManifest]( - parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String], - preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) + parent: RDD[T], + command: Seq[String], + envVars: java.util.Map[String, String], + preservePartitoning: Boolean, + pythonExec: String, + broadcastVars: java.util.List[Broadcast[Array[Byte]]]) extends RDD[Array[Byte]](parent.context) { // Similar to Runtime.exec(), if we are given a single string, split it into words @@ -38,8 +42,8 @@ private[spark] class PythonRDD[T: ClassManifest]( // Add the environmental variables to the process. val currentEnvVars = pb.environment() - envVars.foreach { - case (variable, value) => currentEnvVars.put(variable, value) + for ((variable, value) <- envVars) { + currentEnvVars.put(variable, value) } val proc = pb.start() @@ -116,6 +120,10 @@ private[spark] class PythonRDD[T: ClassManifest]( val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) } +/** + * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. + * This is used by PySpark's shuffle operations. + */ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Array[Byte], Array[Byte])](prev.context) { override def splits = prev.splits @@ -139,6 +147,16 @@ private[spark] object PythonRDD { * Write strings, pickled Python objects, or pairs of pickled objects to a data output stream. * The data format is a 32-bit integer representing the pickled object's length (in bytes), * followed by the pickled data. + * + * Pickle module: + * + * http://docs.python.org/2/library/pickle.html + * + * The pickle protocol is documented in the source of the `pickle` and `pickletools` modules: + * + * http://hg.python.org/cpython/file/2.6/Lib/pickle.py + * http://hg.python.org/cpython/file/2.6/Lib/pickletools.py + * * @param elem the object to write * @param dOut a data output stream */ @@ -201,15 +219,14 @@ private[spark] object PythonRDD { } private object Pickle { - def b(x: Int): Byte = x.asInstanceOf[Byte] - val PROTO: Byte = b(0x80) - val TWO: Byte = b(0x02) - val BINUNICODE : Byte = 'X' - val STOP : Byte = '.' - val TUPLE2 : Byte = b(0x86) - val EMPTY_LIST : Byte = ']' - val MARK : Byte = '(' - val APPENDS : Byte = 'e' + val PROTO: Byte = 0x80.toByte + val TWO: Byte = 0x02.toByte + val BINUNICODE: Byte = 'X' + val STOP: Byte = '.' + val TUPLE2: Byte = 0x86.toByte + val EMPTY_LIST: Byte = ']' + val MARK: Byte = '(' + val APPENDS: Byte = 'e' } private class ExtractValue extends spark.api.java.function.Function[(Array[Byte], diff --git a/docs/index.md b/docs/index.md index 33ab58a962..848b585333 100644 --- a/docs/index.md +++ b/docs/index.md @@ -8,7 +8,7 @@ TODO(andyk): Rewrite to make the Java API a first class part of the story. {% endcomment %} Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter. -It provides clean, language-integrated APIs in Scala, Java, and Python, with a rich array of parallel operators. +It provides clean, language-integrated APIs in [Scala](scala-programming-guide.html), [Java](java-programming-guide.html), and [Python](python-programming-guide.html), with a rich array of parallel operators. Spark can run on top of the [Apache Mesos](http://incubator.apache.org/mesos/) cluster manager, [Hadoop YARN](http://hadoop.apache.org/docs/r2.0.1-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html), Amazon EC2, or without an independent resource manager ("standalone mode"). @@ -61,6 +61,11 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`). * [Java Programming Guide](java-programming-guide.html): using Spark from Java * [Python Programming Guide](python-programming-guide.html): using Spark from Python +**API Docs:** + +* [Java/Scala (Scaladoc)](api/core/index.html) +* [Python (Epydoc)](api/pyspark/index.html) + **Deployment guides:** * [Running Spark on Amazon EC2](ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes @@ -73,7 +78,6 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`). * [Configuration](configuration.html): customize Spark via its configuration system * [Tuning Guide](tuning.html): best practices to optimize performance and memory use -* API Docs: [Java/Scala (Scaladoc)](api/core/index.html) and [Python (Epydoc)](api/pyspark/index.html) * [Bagel](bagel-programming-guide.html): an implementation of Google's Pregel on Spark * [Contributing to Spark](contributing-to-spark.html) diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index b7c747f905..d88d4eb42d 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -17,8 +17,7 @@ There are a few key differences between the Python and Scala APIs: * Python is dynamically typed, so RDDs can hold objects of different types. * PySpark does not currently support the following Spark features: - Accumulators - - Special functions on RRDs of doubles, such as `mean` and `stdev` - - Approximate jobs / functions, such as `countApprox` and `sumApprox`. + - Special functions on RDDs of doubles, such as `mean` and `stdev` - `lookup` - `mapPartitionsWithSplit` - `persist` at storage levels other than `MEMORY_ONLY` diff --git a/pyspark/examples/kmeans.py b/pyspark/examples/kmeans.py index 9cc366f03c..ad2be21178 100644 --- a/pyspark/examples/kmeans.py +++ b/pyspark/examples/kmeans.py @@ -1,18 +1,21 @@ +""" +This example requires numpy (http://www.numpy.org/) +""" import sys -from pyspark.context import SparkContext -from numpy import array, sum as np_sum +import numpy as np +from pyspark import SparkContext def parseVector(line): - return array([float(x) for x in line.split(' ')]) + return np.array([float(x) for x in line.split(' ')]) def closestPoint(p, centers): bestIndex = 0 closest = float("+inf") for i in range(len(centers)): - tempDist = np_sum((p - centers[i]) ** 2) + tempDist = np.sum((p - centers[i]) ** 2) if tempDist < closest: closest = tempDist bestIndex = i @@ -41,7 +44,7 @@ if __name__ == "__main__": newPoints = pointStats.map( lambda (x, (y, z)): (x, y / z)).collect() - tempDist = sum(np_sum((kPoints[x] - y) ** 2) for (x, y) in newPoints) + tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints) for (x, y) in newPoints: kPoints[x] = y diff --git a/pyspark/examples/logistic_regression.py b/pyspark/examples/logistic_regression.py new file mode 100755 index 0000000000..f13698a86f --- /dev/null +++ b/pyspark/examples/logistic_regression.py @@ -0,0 +1,57 @@ +""" +This example requires numpy (http://www.numpy.org/) +""" +from collections import namedtuple +from math import exp +from os.path import realpath +import sys + +import numpy as np +from pyspark import SparkContext + + +N = 100000 # Number of data points +D = 10 # Number of dimensions +R = 0.7 # Scaling factor +ITERATIONS = 5 +np.random.seed(42) + + +DataPoint = namedtuple("DataPoint", ['x', 'y']) +from lr import DataPoint # So that DataPoint is properly serialized + + +def generateData(): + def generatePoint(i): + y = -1 if i % 2 == 0 else 1 + x = np.random.normal(size=D) + (y * R) + return DataPoint(x, y) + return [generatePoint(i) for i in range(N)] + + +if __name__ == "__main__": + if len(sys.argv) == 1: + print >> sys.stderr, \ + "Usage: PythonLR []" + exit(-1) + sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)]) + slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 + points = sc.parallelize(generateData(), slices).cache() + + # Initialize w to a random value + w = 2 * np.random.ranf(size=D) - 1 + print "Initial w: " + str(w) + + def add(x, y): + x += y + return x + + for i in range(1, ITERATIONS + 1): + print "On iteration %i" % i + + gradient = points.map(lambda p: + (1.0 / (1.0 + exp(-p.y * np.dot(w, p.x)))) * p.y * p.x + ).reduce(add) + w -= gradient + + print "Final w: " + str(w) diff --git a/pyspark/examples/lr.py b/pyspark/examples/lr.py deleted file mode 100755 index 5fca0266b8..0000000000 --- a/pyspark/examples/lr.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -This example requires numpy (http://www.numpy.org/) -""" -from collections import namedtuple -from math import exp -from os.path import realpath -import sys - -import numpy as np -from pyspark.context import SparkContext - - -N = 100000 # Number of data points -D = 10 # Number of dimensions -R = 0.7 # Scaling factor -ITERATIONS = 5 -np.random.seed(42) - - -DataPoint = namedtuple("DataPoint", ['x', 'y']) -from lr import DataPoint # So that DataPoint is properly serialized - - -def generateData(): - def generatePoint(i): - y = -1 if i % 2 == 0 else 1 - x = np.random.normal(size=D) + (y * R) - return DataPoint(x, y) - return [generatePoint(i) for i in range(N)] - - -if __name__ == "__main__": - if len(sys.argv) == 1: - print >> sys.stderr, \ - "Usage: PythonLR []" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)]) - slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 - points = sc.parallelize(generateData(), slices).cache() - - # Initialize w to a random value - w = 2 * np.random.ranf(size=D) - 1 - print "Initial w: " + str(w) - - def add(x, y): - x += y - return x - - for i in range(1, ITERATIONS + 1): - print "On iteration %i" % i - - gradient = points.map(lambda p: - (1.0 / (1.0 + exp(-p.y * np.dot(w, p.x)))) * p.y * p.x - ).reduce(add) - w -= gradient - - print "Final w: " + str(w) diff --git a/pyspark/examples/pi.py b/pyspark/examples/pi.py index 348bbc5dce..127cba029b 100644 --- a/pyspark/examples/pi.py +++ b/pyspark/examples/pi.py @@ -1,13 +1,14 @@ import sys from random import random from operator import add -from pyspark.context import SparkContext + +from pyspark import SparkContext if __name__ == "__main__": if len(sys.argv) == 1: print >> sys.stderr, \ - "Usage: PythonPi []" + "Usage: PythonPi []" exit(-1) sc = SparkContext(sys.argv[1], "PythonPi") slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 diff --git a/pyspark/examples/tc.py b/pyspark/examples/tc.py deleted file mode 100644 index 9630e72b47..0000000000 --- a/pyspark/examples/tc.py +++ /dev/null @@ -1,49 +0,0 @@ -import sys -from random import Random -from pyspark.context import SparkContext - -numEdges = 200 -numVertices = 100 -rand = Random(42) - - -def generateGraph(): - edges = set() - while len(edges) < numEdges: - src = rand.randrange(0, numEdges) - dst = rand.randrange(0, numEdges) - if src != dst: - edges.add((src, dst)) - return edges - - -if __name__ == "__main__": - if len(sys.argv) == 1: - print >> sys.stderr, \ - "Usage: PythonTC []" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonTC") - slices = sys.argv[2] if len(sys.argv) > 2 else 2 - tc = sc.parallelize(generateGraph(), slices).cache() - - # Linear transitive closure: each round grows paths by one edge, - # by joining the graph's edges with the already-discovered paths. - # e.g. join the path (y, z) from the TC with the edge (x, y) from - # the graph to obtain the path (x, z). - - # Because join() joins on keys, the edges are stored in reversed order. - edges = tc.map(lambda (x, y): (y, x)) - - oldCount = 0L - nextCount = tc.count() - while True: - oldCount = nextCount - # Perform the join, obtaining an RDD of (y, (z, x)) pairs, - # then project the result to obtain the new (x, z) paths. - new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a)) - tc = tc.union(new_edges).distinct().cache() - nextCount = tc.count() - if nextCount == oldCount: - break - - print "TC has %i edges" % tc.count() diff --git a/pyspark/examples/transitive_closure.py b/pyspark/examples/transitive_closure.py new file mode 100644 index 0000000000..73f7f8fbaf --- /dev/null +++ b/pyspark/examples/transitive_closure.py @@ -0,0 +1,50 @@ +import sys +from random import Random + +from pyspark import SparkContext + +numEdges = 200 +numVertices = 100 +rand = Random(42) + + +def generateGraph(): + edges = set() + while len(edges) < numEdges: + src = rand.randrange(0, numEdges) + dst = rand.randrange(0, numEdges) + if src != dst: + edges.add((src, dst)) + return edges + + +if __name__ == "__main__": + if len(sys.argv) == 1: + print >> sys.stderr, \ + "Usage: PythonTC []" + exit(-1) + sc = SparkContext(sys.argv[1], "PythonTC") + slices = sys.argv[2] if len(sys.argv) > 2 else 2 + tc = sc.parallelize(generateGraph(), slices).cache() + + # Linear transitive closure: each round grows paths by one edge, + # by joining the graph's edges with the already-discovered paths. + # e.g. join the path (y, z) from the TC with the edge (x, y) from + # the graph to obtain the path (x, z). + + # Because join() joins on keys, the edges are stored in reversed order. + edges = tc.map(lambda (x, y): (y, x)) + + oldCount = 0L + nextCount = tc.count() + while True: + oldCount = nextCount + # Perform the join, obtaining an RDD of (y, (z, x)) pairs, + # then project the result to obtain the new (x, z) paths. + new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a)) + tc = tc.union(new_edges).distinct().cache() + nextCount = tc.count() + if nextCount == oldCount: + break + + print "TC has %i edges" % tc.count() diff --git a/pyspark/examples/wordcount.py b/pyspark/examples/wordcount.py index 8365c070e8..857160624b 100644 --- a/pyspark/examples/wordcount.py +++ b/pyspark/examples/wordcount.py @@ -1,6 +1,8 @@ import sys from operator import add -from pyspark.context import SparkContext + +from pyspark import SparkContext + if __name__ == "__main__": if len(sys.argv) < 3: diff --git a/pyspark/pyspark/__init__.py b/pyspark/pyspark/__init__.py index 8f8402b62b..1ab360a666 100644 --- a/pyspark/pyspark/__init__.py +++ b/pyspark/pyspark/__init__.py @@ -1,9 +1,20 @@ +""" +PySpark is a Python API for Spark. + +Public classes: + + - L{SparkContext} + Main entry point for Spark functionality. + - L{RDD} + A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. +""" import sys import os sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "pyspark/lib/py4j0.7.egg")) from pyspark.context import SparkContext +from pyspark.rdd import RDD -__all__ = ["SparkContext"] +__all__ = ["SparkContext", "RDD"] -- cgit v1.2.3