diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-27 17:55:33 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-27 18:04:10 -0800 |
commit | 85b8f2c64f0fc4be5645d8736629fc082cb3587b (patch) | |
tree | dba808b3c96d0241654b4b0c99ec0786ac2339dd /pyspark/pyspark/rdd.py | |
parent | 2d98fff0651af4d527f41ba50c01f453fa049464 (diff) | |
download | spark-85b8f2c64f0fc4be5645d8736629fc082cb3587b.tar.gz spark-85b8f2c64f0fc4be5645d8736629fc082cb3587b.tar.bz2 spark-85b8f2c64f0fc4be5645d8736629fc082cb3587b.zip |
Add epydoc API documentation for PySpark.
Diffstat (limited to 'pyspark/pyspark/rdd.py')
-rw-r--r-- | pyspark/pyspark/rdd.py | 195 |
1 files changed, 181 insertions, 14 deletions
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py index d7081dffd2..5af105ef62 100644 --- a/pyspark/pyspark/rdd.py +++ b/pyspark/pyspark/rdd.py @@ -18,24 +18,50 @@ from pyspark.join import python_join, python_left_outer_join, \ from py4j.java_collections import ListConverter, MapConverter +__all__ = ["RDD"] + + class RDD(object): + """ + A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. + Represents an immutable, partitioned collection of elements that can be + operated on in parallel. + """ def __init__(self, jrdd, ctx): self._jrdd = jrdd self.is_cached = False self.ctx = ctx + @property + def context(self): + """ + The L{SparkContext} that this RDD was created on. + """ + return self.ctx + def cache(self): + """ + Persist this RDD with the default storage level (C{MEMORY_ONLY}). + """ self.is_cached = True self._jrdd.cache() return self + # TODO persist(self, storageLevel) + def map(self, f, preservesPartitioning=False): + """ + Return a new RDD containing the distinct elements in this RDD. + """ def func(iterator): return imap(f, iterator) return PipelinedRDD(self, func, preservesPartitioning) - def flatMap(self, f): + def flatMap(self, f, preservesPartitioning=False): """ + Return a new RDD by first applying a function to all elements of this + RDD, and then flattening the results. + >>> rdd = sc.parallelize([2, 3, 4]) >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) [1, 1, 1, 2, 2, 3] @@ -43,19 +69,25 @@ class RDD(object): [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] """ def func(iterator): return chain.from_iterable(imap(f, iterator)) - return self.mapPartitions(func) + return self.mapPartitions(func, preservesPartitioning) - def mapPartitions(self, f): + def mapPartitions(self, f, preservesPartitioning=False): """ + Return a new RDD by applying a function to each partition of this RDD. + >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7] """ - return PipelinedRDD(self, f) + return PipelinedRDD(self, f, preservesPartitioning) + + # TODO: mapPartitionsWithSplit def filter(self, f): """ + Return a new RDD containing only the elements that satisfy a predicate. + >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4] @@ -65,6 +97,8 @@ class RDD(object): def distinct(self): """ + Return a new RDD containing the distinct elements in this RDD. + >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) [1, 2, 3] """ @@ -72,21 +106,25 @@ class RDD(object): .reduceByKey(lambda x, _: x) \ .map(lambda (x, _): x) - def sample(self, withReplacement, fraction, seed): - jrdd = self._jrdd.sample(withReplacement, fraction, seed) - return RDD(jrdd, self.ctx) + # TODO: sampling needs to be re-implemented due to Batch + #def sample(self, withReplacement, fraction, seed): + # jrdd = self._jrdd.sample(withReplacement, fraction, seed) + # return RDD(jrdd, self.ctx) - def takeSample(self, withReplacement, num, seed): - vals = self._jrdd.takeSample(withReplacement, num, seed) - return [load_pickle(bytes(x)) for x in vals] + #def takeSample(self, withReplacement, num, seed): + # vals = self._jrdd.takeSample(withReplacement, num, seed) + # return [load_pickle(bytes(x)) for x in vals] def union(self, other): """ + Return the union of this RDD and another one. + >>> rdd = sc.parallelize([1, 1, 2, 3]) >>> rdd.union(rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3] - # Union of batched and unbatched RDDs: + Union of batched and unbatched RDDs (internal test): + >>> batchedRDD = sc.parallelize([Batch([1, 2, 3, 4, 5])]) >>> rdd.union(batchedRDD).collect() [1, 1, 2, 3, 1, 2, 3, 4, 5] @@ -95,6 +133,8 @@ class RDD(object): def __add__(self, other): """ + Return the union of this RDD and another one. + >>> rdd = sc.parallelize([1, 1, 2, 3]) >>> (rdd + rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3] @@ -107,6 +147,9 @@ class RDD(object): def glom(self): """ + Return an RDD created by coalescing all elements within each partition + into a list. + >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> rdd.glom().first() [1, 2] @@ -116,6 +159,10 @@ class RDD(object): def cartesian(self, other): """ + Return the Cartesian product of this RDD and another one, that is, the + RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and + C{b} is in C{other}. + >>> rdd = sc.parallelize([1, 2]) >>> sorted(rdd.cartesian(rdd).collect()) [(1, 1), (1, 2), (2, 1), (2, 2)] @@ -124,6 +171,8 @@ class RDD(object): def groupBy(self, f, numSplits=None): """ + Return an RDD of grouped items. + >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) >>> result = rdd.groupBy(lambda x: x % 2).collect() >>> sorted([(x, sorted(y)) for (x, y) in result]) @@ -133,6 +182,8 @@ class RDD(object): def pipe(self, command, env={}): """ + Return an RDD created by piping elements to a forked external process. + >>> sc.parallelize([1, 2, 3]).pipe('cat').collect() ['1', '2', '3'] """ @@ -148,12 +199,17 @@ class RDD(object): def foreach(self, f): """ + Applies a function to all elements of this RDD. + >>> def f(x): print x >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) """ self.map(f).collect() # Force evaluation def collect(self): + """ + Return a list that contains all of the elements in this RDD. + """ picklesInJava = self._jrdd.rdd().collect() return list(self._collect_array_through_file(picklesInJava)) @@ -176,6 +232,9 @@ class RDD(object): def reduce(self, f): """ + Reduces the elements of this RDD using the specified associative binary + operator. + >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 15 @@ -198,9 +257,11 @@ class RDD(object): """ Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero - value." The function op(t1, t2) is allowed to modify t1 and return it + value." + + The function C{op(t1, t2)} is allowed to modify C{t1} and return it as its result value to avoid object allocation; however, it should not - modify t2. + modify C{t2}. >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) @@ -218,6 +279,8 @@ class RDD(object): def sum(self): """ + Add up the elements in this RDD. + >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 6.0 """ @@ -225,6 +288,8 @@ class RDD(object): def count(self): """ + Return the number of elements in this RDD. + >>> sc.parallelize([2, 3, 4]).count() 3 >>> sc.parallelize([Batch([2, 3, 4])]).count() @@ -234,6 +299,9 @@ class RDD(object): def countByValue(self): """ + Return the count of each unique value in this RDD as a dictionary of + (value, count) pairs. + >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) [(1, 2), (2, 3)] """ @@ -250,6 +318,12 @@ class RDD(object): def take(self, num): """ + Take the first num elements of the RDD. + + This currently scans the partitions *one by one*, so it will be slow if + a lot of partitions are required. In that case, use L{collect} to get + the whole RDD instead. + >>> sc.parallelize([2, 3, 4]).take(2) [2, 3] """ @@ -258,12 +332,18 @@ class RDD(object): def first(self): """ + Return the first element in this RDD. + >>> sc.parallelize([2, 3, 4]).first() 2 """ return self.take(1)[0] + # TODO: add test and fix for use with Batch def saveAsTextFile(self, path): + """ + Save this RDD as a text file, using string representations of elements. + """ def func(iterator): return (str(x).encode("utf-8") for x in iterator) keyed = PipelinedRDD(self, func) @@ -274,6 +354,8 @@ class RDD(object): def collectAsMap(self): """ + Return the key-value pairs in this RDD to the master as a dictionary. + >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() >>> m[1] 2 @@ -284,6 +366,14 @@ class RDD(object): def reduceByKey(self, func, numSplits=None): """ + Merge the values for each key using an associative reduce function. + + This will also perform the merging locally on each mapper before + sending results to a reducer, similarly to a "combiner" in MapReduce. + + Output will be hash-partitioned with C{numSplits} splits, or the + default parallelism level if C{numSplits} is not specified. + >>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) @@ -293,6 +383,12 @@ class RDD(object): def reduceByKeyLocally(self, func): """ + Merge the values for each key using an associative reduce function, but + return the results immediately to the master as a dictionary. + + This will also perform the merging locally on each mapper before + sending results to a reducer, similarly to a "combiner" in MapReduce. + >>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKeyLocally(add).items()) @@ -311,6 +407,9 @@ class RDD(object): def countByKey(self): """ + Count the number of elements for each key, and return the result to the + master as a dictionary. + >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.countByKey().items()) [('a', 2), ('b', 1)] @@ -319,6 +418,14 @@ class RDD(object): def join(self, other, numSplits=None): """ + Return an RDD containing all pairs of elements with matching keys in + C{self} and C{other}. + + Each pair of elements will be returned as a (k, (v1, v2)) tuple, where + (k, v1) is in C{self} and (k, v2) is in C{other}. + + Performs a hash join across the cluster. + >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2), ("a", 3)]) >>> sorted(x.join(y).collect()) @@ -328,6 +435,14 @@ class RDD(object): def leftOuterJoin(self, other, numSplits=None): """ + Perform a left outer join of C{self} and C{other}. + + For each element (k, v) in C{self}, the resulting RDD will either + contain all pairs (k, (v, w)) for w in C{other}, or the pair + (k, (v, None)) if no elements in other have key k. + + Hash-partitions the resulting RDD into the given number of partitions. + >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) >>> sorted(x.leftOuterJoin(y).collect()) @@ -337,6 +452,14 @@ class RDD(object): def rightOuterJoin(self, other, numSplits=None): """ + Perform a right outer join of C{self} and C{other}. + + For each element (k, w) in C{other}, the resulting RDD will either + contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) + if no elements in C{self} have key k. + + Hash-partitions the resulting RDD into the given number of partitions. + >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) >>> sorted(y.rightOuterJoin(x).collect()) @@ -344,8 +467,11 @@ class RDD(object): """ return python_right_outer_join(self, other, numSplits) + # TODO: add option to control map-side combining def partitionBy(self, numSplits, hashFunc=hash): """ + Return a copy of the RDD partitioned using the specified partitioner. + >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) >>> sets = pairs.partitionBy(2).glom().collect() >>> set(sets[0]).intersection(set(sets[1])) @@ -371,9 +497,27 @@ class RDD(object): jrdd = jrdd.map(self.ctx.jvm.ExtractValue()) return RDD(jrdd, self.ctx) + # TODO: add control over map-side aggregation def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numSplits=None): """ + Generic function to combine the elements for each key using a custom + set of aggregation functions. + + Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined + type" C. Note that V and C can be different -- for example, one might + group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). + + Users provide three functions: + + - C{createCombiner}, which turns a V into a C (e.g., creates + a one-element list) + - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of + a list) + - C{mergeCombiners}, to combine two C's into a single one. + + In addition, users can control the partitioning of the output RDD. + >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> def f(x): return x >>> def add(a, b): return a + str(b) @@ -402,8 +546,12 @@ class RDD(object): return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners) + # TODO: support variant with custom partitioner def groupByKey(self, numSplits=None): """ + Group the values for each key in the RDD into a single sequence. + Hash-partitions the resulting RDD with into numSplits partitions. + >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(x.groupByKey().collect()) [('a', [1, 1]), ('b', [1])] @@ -422,20 +570,39 @@ class RDD(object): return self.combineByKey(createCombiner, mergeValue, mergeCombiners, numSplits) + # TODO: add tests def flatMapValues(self, f): + """ + Pass each value in the key-value pair RDD through a flatMap function + without changing the keys; this also retains the original RDD's + partitioning. + """ flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) - return self.flatMap(flat_map_fn) + return self.flatMap(flat_map_fn, preservesPartitioning=True) def mapValues(self, f): + """ + Pass each value in the key-value pair RDD through a map function + without changing the keys; this also retains the original RDD's + partitioning. + """ map_values_fn = lambda (k, v): (k, f(v)) return self.map(map_values_fn, preservesPartitioning=True) # TODO: support varargs cogroup of several RDDs. def groupWith(self, other): + """ + Alias for cogroup. + """ return self.cogroup(other) + # TODO: add variant with custom parittioner def cogroup(self, other, numSplits=None): """ + For each key k in C{self} or C{other}, return a resulting RDD that + contains a tuple with the list of values for that key in C{self} as well + as C{other}. + >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) >>> sorted(x.cogroup(y).collect()) |