aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-27 17:55:33 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-27 18:04:10 -0800
commit85b8f2c64f0fc4be5645d8736629fc082cb3587b (patch)
treedba808b3c96d0241654b4b0c99ec0786ac2339dd
parent2d98fff0651af4d527f41ba50c01f453fa049464 (diff)
downloadspark-85b8f2c64f0fc4be5645d8736629fc082cb3587b.tar.gz
spark-85b8f2c64f0fc4be5645d8736629fc082cb3587b.tar.bz2
spark-85b8f2c64f0fc4be5645d8736629fc082cb3587b.zip
Add epydoc API documentation for PySpark.
-rw-r--r--docs/README.md8
-rwxr-xr-xdocs/_layouts/global.html10
-rw-r--r--docs/_plugins/copy_api_dirs.rb17
-rw-r--r--pyspark/epydoc.conf19
-rw-r--r--pyspark/pyspark/context.py24
-rw-r--r--pyspark/pyspark/rdd.py195
6 files changed, 254 insertions, 19 deletions
diff --git a/docs/README.md b/docs/README.md
index 092153070e..887f407f18 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -25,10 +25,12 @@ To mark a block of code in your markdown to be syntax highlighted by jekyll duri
// supported languages too.
{% endhighlight %}
-## Scaladoc
+## API Docs (Scaladoc and Epydoc)
You can build just the Spark scaladoc by running `sbt/sbt doc` from the SPARK_PROJECT_ROOT directory.
-When you run `jekyll` in the docs directory, it will also copy over the scala doc for the various Spark subprojects into the docs directory (and then also into the _site directory). We use a jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc.
+Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the SPARK_PROJECT_ROOT/pyspark directory.
-NOTE: To skip the step of building and copying over the scaladoc when you build the docs, run `SKIP_SCALADOC=1 jekyll`.
+When you run `jekyll` in the docs directory, it will also copy over the scaladoc for the various Spark subprojects into the docs directory (and then also into the _site directory). We use a jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc. The jekyll plugin also generates the PySpark docs using [epydoc](http://epydoc.sourceforge.net/).
+
+NOTE: To skip the step of building and copying over the scaladoc when you build the docs, run `SKIP_SCALADOC=1 jekyll`. Similarly, `SKIP_EPYDOC=1 jekyll` will skip PySpark API doc generation.
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 41ad5242c9..43a5fa3e1c 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -49,8 +49,14 @@
<li><a href="java-programming-guide.html">Java</a></li>
</ul>
</li>
-
- <li><a href="api/core/index.html">API (Scaladoc)</a></li>
+
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown">API<b class="caret"></b></a>
+ <ul class="dropdown-menu">
+ <li><a href="api/core/index.html">Scala/Java (Scaladoc)</a></li>
+ <li><a href="api/pyspark/index.html">Python (Epydoc)</a></li>
+ </ul>
+ </li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb
index e61c105449..577f3ebe70 100644
--- a/docs/_plugins/copy_api_dirs.rb
+++ b/docs/_plugins/copy_api_dirs.rb
@@ -28,3 +28,20 @@ if ENV['SKIP_SCALADOC'] != '1'
cp_r(source + "/.", dest)
end
end
+
+if ENV['SKIP_EPYDOC'] != '1'
+ puts "Moving to pyspark directory and building epydoc."
+ cd("../pyspark")
+ puts `epydoc --config epydoc.conf`
+
+ puts "Moving back into docs dir."
+ cd("../docs")
+
+ puts "echo making directory pyspark"
+ mkdir_p "pyspark"
+
+ puts "cp -r ../pyspark/docs/. api/pyspark"
+ cp_r("../pyspark/docs/.", "api/pyspark")
+
+ cd("..")
+end
diff --git a/pyspark/epydoc.conf b/pyspark/epydoc.conf
new file mode 100644
index 0000000000..91ac984ba2
--- /dev/null
+++ b/pyspark/epydoc.conf
@@ -0,0 +1,19 @@
+[epydoc] # Epydoc section marker (required by ConfigParser)
+
+# Information about the project.
+name: PySpark
+url: http://spark-project.org
+
+# The list of modules to document. Modules can be named using
+# dotted names, module filenames, or package directory names.
+# This option may be repeated.
+modules: pyspark
+
+# Write html output to the directory "apidocs"
+output: html
+target: docs/
+
+private: no
+
+exclude: pyspark.cloudpickle pyspark.worker pyspark.join pyspark.serializers
+ pyspark.java_gateway pyspark.examples pyspark.shell
diff --git a/pyspark/pyspark/context.py b/pyspark/pyspark/context.py
index 032619693a..7758d3e375 100644
--- a/pyspark/pyspark/context.py
+++ b/pyspark/pyspark/context.py
@@ -11,6 +11,11 @@ from py4j.java_collections import ListConverter
class SparkContext(object):
+ """
+ Main entry point for Spark functionality. A SparkContext represents the
+ connection to a Spark cluster, and can be used to create L{RDD}s and
+ broadcast variables on that cluster.
+ """
gateway = launch_gateway()
jvm = gateway.jvm
@@ -36,10 +41,16 @@ class SparkContext(object):
self._jsc.stop()
def stop(self):
+ """
+ Shut down the SparkContext.
+ """
self._jsc.stop()
self._jsc = None
def parallelize(self, c, numSlices=None):
+ """
+ Distribute a local Python collection to form an RDD.
+ """
numSlices = numSlices or self.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized
@@ -53,17 +64,30 @@ class SparkContext(object):
return RDD(jrdd, self)
def textFile(self, name, minSplits=None):
+ """
+ Read a text file from HDFS, a local file system (available on all
+ nodes), or any Hadoop-supported file system URI, and return it as an
+ RDD of Strings.
+ """
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jsc.textFile(name, minSplits)
return RDD(jrdd, self)
def union(self, rdds):
+ """
+ Build the union of a list of RDDs
+ """
first = rdds[0]._jrdd
rest = [x._jrdd for x in rdds[1:]]
rest = ListConverter().convert(rest, self.gateway._gateway_client)
return RDD(self._jsc.union(first, rest), self)
def broadcast(self, value):
+ """
+ Broadcast a read-only variable to the cluster, returning a C{Broadcast}
+ object for reading it in distributed functions. The variable will be
+ sent to each cluster only once.
+ """
jbroadcast = self._jsc.broadcast(bytearray(dump_pickle(value)))
return Broadcast(jbroadcast.id(), value, jbroadcast,
self._pickled_broadcast_vars)
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index d7081dffd2..5af105ef62 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -18,24 +18,50 @@ from pyspark.join import python_join, python_left_outer_join, \
from py4j.java_collections import ListConverter, MapConverter
+__all__ = ["RDD"]
+
+
class RDD(object):
+ """
+ A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
+ Represents an immutable, partitioned collection of elements that can be
+ operated on in parallel.
+ """
def __init__(self, jrdd, ctx):
self._jrdd = jrdd
self.is_cached = False
self.ctx = ctx
+ @property
+ def context(self):
+ """
+ The L{SparkContext} that this RDD was created on.
+ """
+ return self.ctx
+
def cache(self):
+ """
+ Persist this RDD with the default storage level (C{MEMORY_ONLY}).
+ """
self.is_cached = True
self._jrdd.cache()
return self
+ # TODO persist(self, storageLevel)
+
def map(self, f, preservesPartitioning=False):
+ """
+ Return a new RDD containing the distinct elements in this RDD.
+ """
def func(iterator): return imap(f, iterator)
return PipelinedRDD(self, func, preservesPartitioning)
- def flatMap(self, f):
+ def flatMap(self, f, preservesPartitioning=False):
"""
+ Return a new RDD by first applying a function to all elements of this
+ RDD, and then flattening the results.
+
>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]
@@ -43,19 +69,25 @@ class RDD(object):
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
def func(iterator): return chain.from_iterable(imap(f, iterator))
- return self.mapPartitions(func)
+ return self.mapPartitions(func, preservesPartitioning)
- def mapPartitions(self, f):
+ def mapPartitions(self, f, preservesPartitioning=False):
"""
+ Return a new RDD by applying a function to each partition of this RDD.
+
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]
"""
- return PipelinedRDD(self, f)
+ return PipelinedRDD(self, f, preservesPartitioning)
+
+ # TODO: mapPartitionsWithSplit
def filter(self, f):
"""
+ Return a new RDD containing only the elements that satisfy a predicate.
+
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]
@@ -65,6 +97,8 @@ class RDD(object):
def distinct(self):
"""
+ Return a new RDD containing the distinct elements in this RDD.
+
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]
"""
@@ -72,21 +106,25 @@ class RDD(object):
.reduceByKey(lambda x, _: x) \
.map(lambda (x, _): x)
- def sample(self, withReplacement, fraction, seed):
- jrdd = self._jrdd.sample(withReplacement, fraction, seed)
- return RDD(jrdd, self.ctx)
+ # TODO: sampling needs to be re-implemented due to Batch
+ #def sample(self, withReplacement, fraction, seed):
+ # jrdd = self._jrdd.sample(withReplacement, fraction, seed)
+ # return RDD(jrdd, self.ctx)
- def takeSample(self, withReplacement, num, seed):
- vals = self._jrdd.takeSample(withReplacement, num, seed)
- return [load_pickle(bytes(x)) for x in vals]
+ #def takeSample(self, withReplacement, num, seed):
+ # vals = self._jrdd.takeSample(withReplacement, num, seed)
+ # return [load_pickle(bytes(x)) for x in vals]
def union(self, other):
"""
+ Return the union of this RDD and another one.
+
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]
- # Union of batched and unbatched RDDs:
+ Union of batched and unbatched RDDs (internal test):
+
>>> batchedRDD = sc.parallelize([Batch([1, 2, 3, 4, 5])])
>>> rdd.union(batchedRDD).collect()
[1, 1, 2, 3, 1, 2, 3, 4, 5]
@@ -95,6 +133,8 @@ class RDD(object):
def __add__(self, other):
"""
+ Return the union of this RDD and another one.
+
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> (rdd + rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]
@@ -107,6 +147,9 @@ class RDD(object):
def glom(self):
"""
+ Return an RDD created by coalescing all elements within each partition
+ into a list.
+
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.glom().first()
[1, 2]
@@ -116,6 +159,10 @@ class RDD(object):
def cartesian(self, other):
"""
+ Return the Cartesian product of this RDD and another one, that is, the
+ RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
+ C{b} is in C{other}.
+
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]
@@ -124,6 +171,8 @@ class RDD(object):
def groupBy(self, f, numSplits=None):
"""
+ Return an RDD of grouped items.
+
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result])
@@ -133,6 +182,8 @@ class RDD(object):
def pipe(self, command, env={}):
"""
+ Return an RDD created by piping elements to a forked external process.
+
>>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
['1', '2', '3']
"""
@@ -148,12 +199,17 @@ class RDD(object):
def foreach(self, f):
"""
+ Applies a function to all elements of this RDD.
+
>>> def f(x): print x
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
"""
self.map(f).collect() # Force evaluation
def collect(self):
+ """
+ Return a list that contains all of the elements in this RDD.
+ """
picklesInJava = self._jrdd.rdd().collect()
return list(self._collect_array_through_file(picklesInJava))
@@ -176,6 +232,9 @@ class RDD(object):
def reduce(self, f):
"""
+ Reduces the elements of this RDD using the specified associative binary
+ operator.
+
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
@@ -198,9 +257,11 @@ class RDD(object):
"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative function and a neutral "zero
- value." The function op(t1, t2) is allowed to modify t1 and return it
+ value."
+
+ The function C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
- modify t2.
+ modify C{t2}.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
@@ -218,6 +279,8 @@ class RDD(object):
def sum(self):
"""
+ Add up the elements in this RDD.
+
>>> sc.parallelize([1.0, 2.0, 3.0]).sum()
6.0
"""
@@ -225,6 +288,8 @@ class RDD(object):
def count(self):
"""
+ Return the number of elements in this RDD.
+
>>> sc.parallelize([2, 3, 4]).count()
3
>>> sc.parallelize([Batch([2, 3, 4])]).count()
@@ -234,6 +299,9 @@ class RDD(object):
def countByValue(self):
"""
+ Return the count of each unique value in this RDD as a dictionary of
+ (value, count) pairs.
+
>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)]
"""
@@ -250,6 +318,12 @@ class RDD(object):
def take(self, num):
"""
+ Take the first num elements of the RDD.
+
+ This currently scans the partitions *one by one*, so it will be slow if
+ a lot of partitions are required. In that case, use L{collect} to get
+ the whole RDD instead.
+
>>> sc.parallelize([2, 3, 4]).take(2)
[2, 3]
"""
@@ -258,12 +332,18 @@ class RDD(object):
def first(self):
"""
+ Return the first element in this RDD.
+
>>> sc.parallelize([2, 3, 4]).first()
2
"""
return self.take(1)[0]
+ # TODO: add test and fix for use with Batch
def saveAsTextFile(self, path):
+ """
+ Save this RDD as a text file, using string representations of elements.
+ """
def func(iterator):
return (str(x).encode("utf-8") for x in iterator)
keyed = PipelinedRDD(self, func)
@@ -274,6 +354,8 @@ class RDD(object):
def collectAsMap(self):
"""
+ Return the key-value pairs in this RDD to the master as a dictionary.
+
>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
>>> m[1]
2
@@ -284,6 +366,14 @@ class RDD(object):
def reduceByKey(self, func, numSplits=None):
"""
+ Merge the values for each key using an associative reduce function.
+
+ This will also perform the merging locally on each mapper before
+ sending results to a reducer, similarly to a "combiner" in MapReduce.
+
+ Output will be hash-partitioned with C{numSplits} splits, or the
+ default parallelism level if C{numSplits} is not specified.
+
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
@@ -293,6 +383,12 @@ class RDD(object):
def reduceByKeyLocally(self, func):
"""
+ Merge the values for each key using an associative reduce function, but
+ return the results immediately to the master as a dictionary.
+
+ This will also perform the merging locally on each mapper before
+ sending results to a reducer, similarly to a "combiner" in MapReduce.
+
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKeyLocally(add).items())
@@ -311,6 +407,9 @@ class RDD(object):
def countByKey(self):
"""
+ Count the number of elements for each key, and return the result to the
+ master as a dictionary.
+
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]
@@ -319,6 +418,14 @@ class RDD(object):
def join(self, other, numSplits=None):
"""
+ Return an RDD containing all pairs of elements with matching keys in
+ C{self} and C{other}.
+
+ Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
+ (k, v1) is in C{self} and (k, v2) is in C{other}.
+
+ Performs a hash join across the cluster.
+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("a", 3)])
>>> sorted(x.join(y).collect())
@@ -328,6 +435,14 @@ class RDD(object):
def leftOuterJoin(self, other, numSplits=None):
"""
+ Perform a left outer join of C{self} and C{other}.
+
+ For each element (k, v) in C{self}, the resulting RDD will either
+ contain all pairs (k, (v, w)) for w in C{other}, or the pair
+ (k, (v, None)) if no elements in other have key k.
+
+ Hash-partitions the resulting RDD into the given number of partitions.
+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.leftOuterJoin(y).collect())
@@ -337,6 +452,14 @@ class RDD(object):
def rightOuterJoin(self, other, numSplits=None):
"""
+ Perform a right outer join of C{self} and C{other}.
+
+ For each element (k, w) in C{other}, the resulting RDD will either
+ contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
+ if no elements in C{self} have key k.
+
+ Hash-partitions the resulting RDD into the given number of partitions.
+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(y.rightOuterJoin(x).collect())
@@ -344,8 +467,11 @@ class RDD(object):
"""
return python_right_outer_join(self, other, numSplits)
+ # TODO: add option to control map-side combining
def partitionBy(self, numSplits, hashFunc=hash):
"""
+ Return a copy of the RDD partitioned using the specified partitioner.
+
>>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
>>> sets = pairs.partitionBy(2).glom().collect()
>>> set(sets[0]).intersection(set(sets[1]))
@@ -371,9 +497,27 @@ class RDD(object):
jrdd = jrdd.map(self.ctx.jvm.ExtractValue())
return RDD(jrdd, self.ctx)
+ # TODO: add control over map-side aggregation
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numSplits=None):
"""
+ Generic function to combine the elements for each key using a custom
+ set of aggregation functions.
+
+ Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
+ type" C. Note that V and C can be different -- for example, one might
+ group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
+
+ Users provide three functions:
+
+ - C{createCombiner}, which turns a V into a C (e.g., creates
+ a one-element list)
+ - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
+ a list)
+ - C{mergeCombiners}, to combine two C's into a single one.
+
+ In addition, users can control the partitioning of the output RDD.
+
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> def f(x): return x
>>> def add(a, b): return a + str(b)
@@ -402,8 +546,12 @@ class RDD(object):
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)
+ # TODO: support variant with custom partitioner
def groupByKey(self, numSplits=None):
"""
+ Group the values for each key in the RDD into a single sequence.
+ Hash-partitions the resulting RDD with into numSplits partitions.
+
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(x.groupByKey().collect())
[('a', [1, 1]), ('b', [1])]
@@ -422,20 +570,39 @@ class RDD(object):
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numSplits)
+ # TODO: add tests
def flatMapValues(self, f):
+ """
+ Pass each value in the key-value pair RDD through a flatMap function
+ without changing the keys; this also retains the original RDD's
+ partitioning.
+ """
flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
- return self.flatMap(flat_map_fn)
+ return self.flatMap(flat_map_fn, preservesPartitioning=True)
def mapValues(self, f):
+ """
+ Pass each value in the key-value pair RDD through a map function
+ without changing the keys; this also retains the original RDD's
+ partitioning.
+ """
map_values_fn = lambda (k, v): (k, f(v))
return self.map(map_values_fn, preservesPartitioning=True)
# TODO: support varargs cogroup of several RDDs.
def groupWith(self, other):
+ """
+ Alias for cogroup.
+ """
return self.cogroup(other)
+ # TODO: add variant with custom parittioner
def cogroup(self, other, numSplits=None):
"""
+ For each key k in C{self} or C{other}, return a resulting RDD that
+ contains a tuple with the list of values for that key in C{self} as well
+ as C{other}.
+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.cogroup(y).collect())