aboutsummaryrefslogtreecommitdiff
path: root/pyspark
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-27 17:55:33 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-27 18:04:10 -0800
commit85b8f2c64f0fc4be5645d8736629fc082cb3587b (patch)
treedba808b3c96d0241654b4b0c99ec0786ac2339dd /pyspark
parent2d98fff0651af4d527f41ba50c01f453fa049464 (diff)
downloadspark-85b8f2c64f0fc4be5645d8736629fc082cb3587b.tar.gz
spark-85b8f2c64f0fc4be5645d8736629fc082cb3587b.tar.bz2
spark-85b8f2c64f0fc4be5645d8736629fc082cb3587b.zip
Add epydoc API documentation for PySpark.
Diffstat (limited to 'pyspark')
-rw-r--r--pyspark/epydoc.conf19
-rw-r--r--pyspark/pyspark/context.py24
-rw-r--r--pyspark/pyspark/rdd.py195
3 files changed, 224 insertions, 14 deletions
diff --git a/pyspark/epydoc.conf b/pyspark/epydoc.conf
new file mode 100644
index 0000000000..91ac984ba2
--- /dev/null
+++ b/pyspark/epydoc.conf
@@ -0,0 +1,19 @@
+[epydoc] # Epydoc section marker (required by ConfigParser)
+
+# Information about the project.
+name: PySpark
+url: http://spark-project.org
+
+# The list of modules to document. Modules can be named using
+# dotted names, module filenames, or package directory names.
+# This option may be repeated.
+modules: pyspark
+
+# Write html output to the directory "apidocs"
+output: html
+target: docs/
+
+private: no
+
+exclude: pyspark.cloudpickle pyspark.worker pyspark.join pyspark.serializers
+ pyspark.java_gateway pyspark.examples pyspark.shell
diff --git a/pyspark/pyspark/context.py b/pyspark/pyspark/context.py
index 032619693a..7758d3e375 100644
--- a/pyspark/pyspark/context.py
+++ b/pyspark/pyspark/context.py
@@ -11,6 +11,11 @@ from py4j.java_collections import ListConverter
class SparkContext(object):
+ """
+ Main entry point for Spark functionality. A SparkContext represents the
+ connection to a Spark cluster, and can be used to create L{RDD}s and
+ broadcast variables on that cluster.
+ """
gateway = launch_gateway()
jvm = gateway.jvm
@@ -36,10 +41,16 @@ class SparkContext(object):
self._jsc.stop()
def stop(self):
+ """
+ Shut down the SparkContext.
+ """
self._jsc.stop()
self._jsc = None
def parallelize(self, c, numSlices=None):
+ """
+ Distribute a local Python collection to form an RDD.
+ """
numSlices = numSlices or self.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized
@@ -53,17 +64,30 @@ class SparkContext(object):
return RDD(jrdd, self)
def textFile(self, name, minSplits=None):
+ """
+ Read a text file from HDFS, a local file system (available on all
+ nodes), or any Hadoop-supported file system URI, and return it as an
+ RDD of Strings.
+ """
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jsc.textFile(name, minSplits)
return RDD(jrdd, self)
def union(self, rdds):
+ """
+ Build the union of a list of RDDs
+ """
first = rdds[0]._jrdd
rest = [x._jrdd for x in rdds[1:]]
rest = ListConverter().convert(rest, self.gateway._gateway_client)
return RDD(self._jsc.union(first, rest), self)
def broadcast(self, value):
+ """
+ Broadcast a read-only variable to the cluster, returning a C{Broadcast}
+ object for reading it in distributed functions. The variable will be
+ sent to each cluster only once.
+ """
jbroadcast = self._jsc.broadcast(bytearray(dump_pickle(value)))
return Broadcast(jbroadcast.id(), value, jbroadcast,
self._pickled_broadcast_vars)
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index d7081dffd2..5af105ef62 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -18,24 +18,50 @@ from pyspark.join import python_join, python_left_outer_join, \
from py4j.java_collections import ListConverter, MapConverter
+__all__ = ["RDD"]
+
+
class RDD(object):
+ """
+ A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
+ Represents an immutable, partitioned collection of elements that can be
+ operated on in parallel.
+ """
def __init__(self, jrdd, ctx):
self._jrdd = jrdd
self.is_cached = False
self.ctx = ctx
+ @property
+ def context(self):
+ """
+ The L{SparkContext} that this RDD was created on.
+ """
+ return self.ctx
+
def cache(self):
+ """
+ Persist this RDD with the default storage level (C{MEMORY_ONLY}).
+ """
self.is_cached = True
self._jrdd.cache()
return self
+ # TODO persist(self, storageLevel)
+
def map(self, f, preservesPartitioning=False):
+ """
+ Return a new RDD containing the distinct elements in this RDD.
+ """
def func(iterator): return imap(f, iterator)
return PipelinedRDD(self, func, preservesPartitioning)
- def flatMap(self, f):
+ def flatMap(self, f, preservesPartitioning=False):
"""
+ Return a new RDD by first applying a function to all elements of this
+ RDD, and then flattening the results.
+
>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]
@@ -43,19 +69,25 @@ class RDD(object):
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
def func(iterator): return chain.from_iterable(imap(f, iterator))
- return self.mapPartitions(func)
+ return self.mapPartitions(func, preservesPartitioning)
- def mapPartitions(self, f):
+ def mapPartitions(self, f, preservesPartitioning=False):
"""
+ Return a new RDD by applying a function to each partition of this RDD.
+
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]
"""
- return PipelinedRDD(self, f)
+ return PipelinedRDD(self, f, preservesPartitioning)
+
+ # TODO: mapPartitionsWithSplit
def filter(self, f):
"""
+ Return a new RDD containing only the elements that satisfy a predicate.
+
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]
@@ -65,6 +97,8 @@ class RDD(object):
def distinct(self):
"""
+ Return a new RDD containing the distinct elements in this RDD.
+
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]
"""
@@ -72,21 +106,25 @@ class RDD(object):
.reduceByKey(lambda x, _: x) \
.map(lambda (x, _): x)
- def sample(self, withReplacement, fraction, seed):
- jrdd = self._jrdd.sample(withReplacement, fraction, seed)
- return RDD(jrdd, self.ctx)
+ # TODO: sampling needs to be re-implemented due to Batch
+ #def sample(self, withReplacement, fraction, seed):
+ # jrdd = self._jrdd.sample(withReplacement, fraction, seed)
+ # return RDD(jrdd, self.ctx)
- def takeSample(self, withReplacement, num, seed):
- vals = self._jrdd.takeSample(withReplacement, num, seed)
- return [load_pickle(bytes(x)) for x in vals]
+ #def takeSample(self, withReplacement, num, seed):
+ # vals = self._jrdd.takeSample(withReplacement, num, seed)
+ # return [load_pickle(bytes(x)) for x in vals]
def union(self, other):
"""
+ Return the union of this RDD and another one.
+
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]
- # Union of batched and unbatched RDDs:
+ Union of batched and unbatched RDDs (internal test):
+
>>> batchedRDD = sc.parallelize([Batch([1, 2, 3, 4, 5])])
>>> rdd.union(batchedRDD).collect()
[1, 1, 2, 3, 1, 2, 3, 4, 5]
@@ -95,6 +133,8 @@ class RDD(object):
def __add__(self, other):
"""
+ Return the union of this RDD and another one.
+
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> (rdd + rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]
@@ -107,6 +147,9 @@ class RDD(object):
def glom(self):
"""
+ Return an RDD created by coalescing all elements within each partition
+ into a list.
+
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.glom().first()
[1, 2]
@@ -116,6 +159,10 @@ class RDD(object):
def cartesian(self, other):
"""
+ Return the Cartesian product of this RDD and another one, that is, the
+ RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
+ C{b} is in C{other}.
+
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]
@@ -124,6 +171,8 @@ class RDD(object):
def groupBy(self, f, numSplits=None):
"""
+ Return an RDD of grouped items.
+
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result])
@@ -133,6 +182,8 @@ class RDD(object):
def pipe(self, command, env={}):
"""
+ Return an RDD created by piping elements to a forked external process.
+
>>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
['1', '2', '3']
"""
@@ -148,12 +199,17 @@ class RDD(object):
def foreach(self, f):
"""
+ Applies a function to all elements of this RDD.
+
>>> def f(x): print x
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
"""
self.map(f).collect() # Force evaluation
def collect(self):
+ """
+ Return a list that contains all of the elements in this RDD.
+ """
picklesInJava = self._jrdd.rdd().collect()
return list(self._collect_array_through_file(picklesInJava))
@@ -176,6 +232,9 @@ class RDD(object):
def reduce(self, f):
"""
+ Reduces the elements of this RDD using the specified associative binary
+ operator.
+
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
@@ -198,9 +257,11 @@ class RDD(object):
"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative function and a neutral "zero
- value." The function op(t1, t2) is allowed to modify t1 and return it
+ value."
+
+ The function C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
- modify t2.
+ modify C{t2}.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
@@ -218,6 +279,8 @@ class RDD(object):
def sum(self):
"""
+ Add up the elements in this RDD.
+
>>> sc.parallelize([1.0, 2.0, 3.0]).sum()
6.0
"""
@@ -225,6 +288,8 @@ class RDD(object):
def count(self):
"""
+ Return the number of elements in this RDD.
+
>>> sc.parallelize([2, 3, 4]).count()
3
>>> sc.parallelize([Batch([2, 3, 4])]).count()
@@ -234,6 +299,9 @@ class RDD(object):
def countByValue(self):
"""
+ Return the count of each unique value in this RDD as a dictionary of
+ (value, count) pairs.
+
>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)]
"""
@@ -250,6 +318,12 @@ class RDD(object):
def take(self, num):
"""
+ Take the first num elements of the RDD.
+
+ This currently scans the partitions *one by one*, so it will be slow if
+ a lot of partitions are required. In that case, use L{collect} to get
+ the whole RDD instead.
+
>>> sc.parallelize([2, 3, 4]).take(2)
[2, 3]
"""
@@ -258,12 +332,18 @@ class RDD(object):
def first(self):
"""
+ Return the first element in this RDD.
+
>>> sc.parallelize([2, 3, 4]).first()
2
"""
return self.take(1)[0]
+ # TODO: add test and fix for use with Batch
def saveAsTextFile(self, path):
+ """
+ Save this RDD as a text file, using string representations of elements.
+ """
def func(iterator):
return (str(x).encode("utf-8") for x in iterator)
keyed = PipelinedRDD(self, func)
@@ -274,6 +354,8 @@ class RDD(object):
def collectAsMap(self):
"""
+ Return the key-value pairs in this RDD to the master as a dictionary.
+
>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
>>> m[1]
2
@@ -284,6 +366,14 @@ class RDD(object):
def reduceByKey(self, func, numSplits=None):
"""
+ Merge the values for each key using an associative reduce function.
+
+ This will also perform the merging locally on each mapper before
+ sending results to a reducer, similarly to a "combiner" in MapReduce.
+
+ Output will be hash-partitioned with C{numSplits} splits, or the
+ default parallelism level if C{numSplits} is not specified.
+
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
@@ -293,6 +383,12 @@ class RDD(object):
def reduceByKeyLocally(self, func):
"""
+ Merge the values for each key using an associative reduce function, but
+ return the results immediately to the master as a dictionary.
+
+ This will also perform the merging locally on each mapper before
+ sending results to a reducer, similarly to a "combiner" in MapReduce.
+
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKeyLocally(add).items())
@@ -311,6 +407,9 @@ class RDD(object):
def countByKey(self):
"""
+ Count the number of elements for each key, and return the result to the
+ master as a dictionary.
+
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]
@@ -319,6 +418,14 @@ class RDD(object):
def join(self, other, numSplits=None):
"""
+ Return an RDD containing all pairs of elements with matching keys in
+ C{self} and C{other}.
+
+ Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
+ (k, v1) is in C{self} and (k, v2) is in C{other}.
+
+ Performs a hash join across the cluster.
+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("a", 3)])
>>> sorted(x.join(y).collect())
@@ -328,6 +435,14 @@ class RDD(object):
def leftOuterJoin(self, other, numSplits=None):
"""
+ Perform a left outer join of C{self} and C{other}.
+
+ For each element (k, v) in C{self}, the resulting RDD will either
+ contain all pairs (k, (v, w)) for w in C{other}, or the pair
+ (k, (v, None)) if no elements in other have key k.
+
+ Hash-partitions the resulting RDD into the given number of partitions.
+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.leftOuterJoin(y).collect())
@@ -337,6 +452,14 @@ class RDD(object):
def rightOuterJoin(self, other, numSplits=None):
"""
+ Perform a right outer join of C{self} and C{other}.
+
+ For each element (k, w) in C{other}, the resulting RDD will either
+ contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
+ if no elements in C{self} have key k.
+
+ Hash-partitions the resulting RDD into the given number of partitions.
+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(y.rightOuterJoin(x).collect())
@@ -344,8 +467,11 @@ class RDD(object):
"""
return python_right_outer_join(self, other, numSplits)
+ # TODO: add option to control map-side combining
def partitionBy(self, numSplits, hashFunc=hash):
"""
+ Return a copy of the RDD partitioned using the specified partitioner.
+
>>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
>>> sets = pairs.partitionBy(2).glom().collect()
>>> set(sets[0]).intersection(set(sets[1]))
@@ -371,9 +497,27 @@ class RDD(object):
jrdd = jrdd.map(self.ctx.jvm.ExtractValue())
return RDD(jrdd, self.ctx)
+ # TODO: add control over map-side aggregation
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numSplits=None):
"""
+ Generic function to combine the elements for each key using a custom
+ set of aggregation functions.
+
+ Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
+ type" C. Note that V and C can be different -- for example, one might
+ group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
+
+ Users provide three functions:
+
+ - C{createCombiner}, which turns a V into a C (e.g., creates
+ a one-element list)
+ - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
+ a list)
+ - C{mergeCombiners}, to combine two C's into a single one.
+
+ In addition, users can control the partitioning of the output RDD.
+
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> def f(x): return x
>>> def add(a, b): return a + str(b)
@@ -402,8 +546,12 @@ class RDD(object):
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)
+ # TODO: support variant with custom partitioner
def groupByKey(self, numSplits=None):
"""
+ Group the values for each key in the RDD into a single sequence.
+ Hash-partitions the resulting RDD with into numSplits partitions.
+
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(x.groupByKey().collect())
[('a', [1, 1]), ('b', [1])]
@@ -422,20 +570,39 @@ class RDD(object):
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numSplits)
+ # TODO: add tests
def flatMapValues(self, f):
+ """
+ Pass each value in the key-value pair RDD through a flatMap function
+ without changing the keys; this also retains the original RDD's
+ partitioning.
+ """
flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
- return self.flatMap(flat_map_fn)
+ return self.flatMap(flat_map_fn, preservesPartitioning=True)
def mapValues(self, f):
+ """
+ Pass each value in the key-value pair RDD through a map function
+ without changing the keys; this also retains the original RDD's
+ partitioning.
+ """
map_values_fn = lambda (k, v): (k, f(v))
return self.map(map_values_fn, preservesPartitioning=True)
# TODO: support varargs cogroup of several RDDs.
def groupWith(self, other):
+ """
+ Alias for cogroup.
+ """
return self.cogroup(other)
+ # TODO: add variant with custom parittioner
def cogroup(self, other, numSplits=None):
"""
+ For each key k in C{self} or C{other}, return a resulting RDD that
+ contains a tuple with the list of values for that key in C{self} as well
+ as C{other}.
+
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.cogroup(y).collect())