aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 13:52:14 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 13:52:14 -0800
commit170e451fbdd308ae77065bd9c0f2bd278abf0cb7 (patch)
treeda3df59e2262dac4b381227d5bc712502249d746
parent6f6a6b79c4c3f3555f8ff427c91e714d02afe8fa (diff)
downloadspark-170e451fbdd308ae77065bd9c0f2bd278abf0cb7.tar.gz
spark-170e451fbdd308ae77065bd9c0f2bd278abf0cb7.tar.bz2
spark-170e451fbdd308ae77065bd9c0f2bd278abf0cb7.zip
Minor documentation and style fixes for PySpark.
-rw-r--r--core/src/main/scala/spark/api/python/PythonPartitioner.scala4
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala43
-rw-r--r--docs/index.md8
-rw-r--r--docs/python-programming-guide.md3
-rw-r--r--pyspark/examples/kmeans.py13
-rwxr-xr-xpyspark/examples/logistic_regression.py (renamed from pyspark/examples/lr.py)4
-rw-r--r--pyspark/examples/pi.py5
-rw-r--r--pyspark/examples/transitive_closure.py (renamed from pyspark/examples/tc.py)5
-rw-r--r--pyspark/examples/wordcount.py4
-rw-r--r--pyspark/pyspark/__init__.py13
10 files changed, 70 insertions, 32 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
index 2c829508e5..648d9402b0 100644
--- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
@@ -17,9 +17,9 @@ private[spark] class PythonPartitioner(override val numPartitions: Int) extends
val hashCode = {
if (key.isInstanceOf[Array[Byte]]) {
Arrays.hashCode(key.asInstanceOf[Array[Byte]])
- }
- else
+ } else {
key.hashCode()
+ }
}
val mod = hashCode % numPartitions
if (mod < 0) {
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index dc48378fdc..19a039e330 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -13,8 +13,12 @@ import spark.rdd.PipedRDD
private[spark] class PythonRDD[T: ClassManifest](
- parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String],
- preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
+ parent: RDD[T],
+ command: Seq[String],
+ envVars: java.util.Map[String, String],
+ preservePartitoning: Boolean,
+ pythonExec: String,
+ broadcastVars: java.util.List[Broadcast[Array[Byte]]])
extends RDD[Array[Byte]](parent.context) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -38,8 +42,8 @@ private[spark] class PythonRDD[T: ClassManifest](
// Add the environmental variables to the process.
val currentEnvVars = pb.environment()
- envVars.foreach {
- case (variable, value) => currentEnvVars.put(variable, value)
+ for ((variable, value) <- envVars) {
+ currentEnvVars.put(variable, value)
}
val proc = pb.start()
@@ -116,6 +120,10 @@ private[spark] class PythonRDD[T: ClassManifest](
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
+/**
+ * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
+ * This is used by PySpark's shuffle operations.
+ */
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Array[Byte], Array[Byte])](prev.context) {
override def splits = prev.splits
@@ -139,6 +147,16 @@ private[spark] object PythonRDD {
* Write strings, pickled Python objects, or pairs of pickled objects to a data output stream.
* The data format is a 32-bit integer representing the pickled object's length (in bytes),
* followed by the pickled data.
+ *
+ * Pickle module:
+ *
+ * http://docs.python.org/2/library/pickle.html
+ *
+ * The pickle protocol is documented in the source of the `pickle` and `pickletools` modules:
+ *
+ * http://hg.python.org/cpython/file/2.6/Lib/pickle.py
+ * http://hg.python.org/cpython/file/2.6/Lib/pickletools.py
+ *
* @param elem the object to write
* @param dOut a data output stream
*/
@@ -201,15 +219,14 @@ private[spark] object PythonRDD {
}
private object Pickle {
- def b(x: Int): Byte = x.asInstanceOf[Byte]
- val PROTO: Byte = b(0x80)
- val TWO: Byte = b(0x02)
- val BINUNICODE : Byte = 'X'
- val STOP : Byte = '.'
- val TUPLE2 : Byte = b(0x86)
- val EMPTY_LIST : Byte = ']'
- val MARK : Byte = '('
- val APPENDS : Byte = 'e'
+ val PROTO: Byte = 0x80.toByte
+ val TWO: Byte = 0x02.toByte
+ val BINUNICODE: Byte = 'X'
+ val STOP: Byte = '.'
+ val TUPLE2: Byte = 0x86.toByte
+ val EMPTY_LIST: Byte = ']'
+ val MARK: Byte = '('
+ val APPENDS: Byte = 'e'
}
private class ExtractValue extends spark.api.java.function.Function[(Array[Byte],
diff --git a/docs/index.md b/docs/index.md
index 33ab58a962..848b585333 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -8,7 +8,7 @@ TODO(andyk): Rewrite to make the Java API a first class part of the story.
{% endcomment %}
Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter.
-It provides clean, language-integrated APIs in Scala, Java, and Python, with a rich array of parallel operators.
+It provides clean, language-integrated APIs in [Scala](scala-programming-guide.html), [Java](java-programming-guide.html), and [Python](python-programming-guide.html), with a rich array of parallel operators.
Spark can run on top of the [Apache Mesos](http://incubator.apache.org/mesos/) cluster manager,
[Hadoop YARN](http://hadoop.apache.org/docs/r2.0.1-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html),
Amazon EC2, or without an independent resource manager ("standalone mode").
@@ -61,6 +61,11 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
* [Java Programming Guide](java-programming-guide.html): using Spark from Java
* [Python Programming Guide](python-programming-guide.html): using Spark from Python
+**API Docs:**
+
+* [Java/Scala (Scaladoc)](api/core/index.html)
+* [Python (Epydoc)](api/pyspark/index.html)
+
**Deployment guides:**
* [Running Spark on Amazon EC2](ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes
@@ -73,7 +78,6 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
* [Configuration](configuration.html): customize Spark via its configuration system
* [Tuning Guide](tuning.html): best practices to optimize performance and memory use
-* API Docs: [Java/Scala (Scaladoc)](api/core/index.html) and [Python (Epydoc)](api/pyspark/index.html)
* [Bagel](bagel-programming-guide.html): an implementation of Google's Pregel on Spark
* [Contributing to Spark](contributing-to-spark.html)
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index b7c747f905..d88d4eb42d 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -17,8 +17,7 @@ There are a few key differences between the Python and Scala APIs:
* Python is dynamically typed, so RDDs can hold objects of different types.
* PySpark does not currently support the following Spark features:
- Accumulators
- - Special functions on RRDs of doubles, such as `mean` and `stdev`
- - Approximate jobs / functions, such as `countApprox` and `sumApprox`.
+ - Special functions on RDDs of doubles, such as `mean` and `stdev`
- `lookup`
- `mapPartitionsWithSplit`
- `persist` at storage levels other than `MEMORY_ONLY`
diff --git a/pyspark/examples/kmeans.py b/pyspark/examples/kmeans.py
index 9cc366f03c..ad2be21178 100644
--- a/pyspark/examples/kmeans.py
+++ b/pyspark/examples/kmeans.py
@@ -1,18 +1,21 @@
+"""
+This example requires numpy (http://www.numpy.org/)
+"""
import sys
-from pyspark.context import SparkContext
-from numpy import array, sum as np_sum
+import numpy as np
+from pyspark import SparkContext
def parseVector(line):
- return array([float(x) for x in line.split(' ')])
+ return np.array([float(x) for x in line.split(' ')])
def closestPoint(p, centers):
bestIndex = 0
closest = float("+inf")
for i in range(len(centers)):
- tempDist = np_sum((p - centers[i]) ** 2)
+ tempDist = np.sum((p - centers[i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
@@ -41,7 +44,7 @@ if __name__ == "__main__":
newPoints = pointStats.map(
lambda (x, (y, z)): (x, y / z)).collect()
- tempDist = sum(np_sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
+ tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
for (x, y) in newPoints:
kPoints[x] = y
diff --git a/pyspark/examples/lr.py b/pyspark/examples/logistic_regression.py
index 5fca0266b8..f13698a86f 100755
--- a/pyspark/examples/lr.py
+++ b/pyspark/examples/logistic_regression.py
@@ -7,7 +7,7 @@ from os.path import realpath
import sys
import numpy as np
-from pyspark.context import SparkContext
+from pyspark import SparkContext
N = 100000 # Number of data points
@@ -32,7 +32,7 @@ def generateData():
if __name__ == "__main__":
if len(sys.argv) == 1:
print >> sys.stderr, \
- "Usage: PythonLR <host> [<slices>]"
+ "Usage: PythonLR <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
diff --git a/pyspark/examples/pi.py b/pyspark/examples/pi.py
index 348bbc5dce..127cba029b 100644
--- a/pyspark/examples/pi.py
+++ b/pyspark/examples/pi.py
@@ -1,13 +1,14 @@
import sys
from random import random
from operator import add
-from pyspark.context import SparkContext
+
+from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) == 1:
print >> sys.stderr, \
- "Usage: PythonPi <host> [<slices>]"
+ "Usage: PythonPi <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonPi")
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
diff --git a/pyspark/examples/tc.py b/pyspark/examples/transitive_closure.py
index 9630e72b47..73f7f8fbaf 100644
--- a/pyspark/examples/tc.py
+++ b/pyspark/examples/transitive_closure.py
@@ -1,6 +1,7 @@
import sys
from random import Random
-from pyspark.context import SparkContext
+
+from pyspark import SparkContext
numEdges = 200
numVertices = 100
@@ -20,7 +21,7 @@ def generateGraph():
if __name__ == "__main__":
if len(sys.argv) == 1:
print >> sys.stderr, \
- "Usage: PythonTC <host> [<slices>]"
+ "Usage: PythonTC <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonTC")
slices = sys.argv[2] if len(sys.argv) > 2 else 2
diff --git a/pyspark/examples/wordcount.py b/pyspark/examples/wordcount.py
index 8365c070e8..857160624b 100644
--- a/pyspark/examples/wordcount.py
+++ b/pyspark/examples/wordcount.py
@@ -1,6 +1,8 @@
import sys
from operator import add
-from pyspark.context import SparkContext
+
+from pyspark import SparkContext
+
if __name__ == "__main__":
if len(sys.argv) < 3:
diff --git a/pyspark/pyspark/__init__.py b/pyspark/pyspark/__init__.py
index 8f8402b62b..1ab360a666 100644
--- a/pyspark/pyspark/__init__.py
+++ b/pyspark/pyspark/__init__.py
@@ -1,9 +1,20 @@
+"""
+PySpark is a Python API for Spark.
+
+Public classes:
+
+ - L{SparkContext<pyspark.context.SparkContext>}
+ Main entry point for Spark functionality.
+ - L{RDD<pyspark.rdd.RDD>}
+ A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
+"""
import sys
import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "pyspark/lib/py4j0.7.egg"))
from pyspark.context import SparkContext
+from pyspark.rdd import RDD
-__all__ = ["SparkContext"]
+__all__ = ["SparkContext", "RDD"]