diff options
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 105 |
1 files changed, 64 insertions, 41 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 4cda6cf661..51c2cb9806 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from base64 import standard_b64encode as b64enc import copy from collections import defaultdict @@ -143,7 +160,7 @@ class RDD(object): >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) [1, 2, 3] """ - return self.map(lambda x: (x, "")) \ + return self.map(lambda x: (x, None)) \ .reduceByKey(lambda x, _: x) \ .map(lambda (x, _): x) @@ -215,7 +232,7 @@ class RDD(object): yield pair return java_cartesian.flatMap(unpack_batches) - def groupBy(self, f, numSplits=None): + def groupBy(self, f, numPartitions=None): """ Return an RDD of grouped items. @@ -224,7 +241,7 @@ class RDD(object): >>> sorted([(x, sorted(y)) for (x, y) in result]) [(0, [2, 8]), (1, [1, 1, 3, 5])] """ - return self.map(lambda x: (f(x), x)).groupByKey(numSplits) + return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) def pipe(self, command, env={}): """ @@ -250,7 +267,11 @@ class RDD(object): >>> def f(x): print x >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) """ - self.map(f).collect() # Force evaluation + def processPartition(iterator): + for x in iterator: + f(x) + yield None + self.mapPartitions(processPartition).collect() # Force evaluation def collect(self): """ @@ -274,8 +295,8 @@ class RDD(object): def reduce(self, f): """ - Reduces the elements of this RDD using the specified associative binary - operator. + Reduces the elements of this RDD using the specified commutative and + associative binary operator. >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) @@ -369,13 +390,16 @@ class RDD(object): >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) [2, 3, 4, 5, 6] """ + def takeUpToNum(iterator): + taken = 0 + while taken < num: + yield next(iterator) + taken += 1 + # Take only up to num elements from each partition we try + mapped = self.mapPartitions(takeUpToNum) items = [] - for partition in range(self._jrdd.splits().size()): - iterator = self.ctx._takePartition(self._jrdd.rdd(), partition) - # Each item in the iterator is a string, Python object, batch of - # Python objects. Regardless, it is sufficient to take `num` - # of these objects in order to collect `num` Python objects: - iterator = iterator.take(num) + for partition in range(mapped._jrdd.splits().size()): + iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition) items.extend(self._collect_iterator_through_file(iterator)) if len(items) >= num: break @@ -399,7 +423,7 @@ class RDD(object): >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) >>> from fileinput import input >>> from glob import glob - >>> ''.join(input(glob(tempFile.name + "/part-0000*"))) + >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' """ def func(split, iterator): @@ -422,22 +446,22 @@ class RDD(object): """ return dict(self.collect()) - def reduceByKey(self, func, numSplits=None): + def reduceByKey(self, func, numPartitions=None): """ Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. - Output will be hash-partitioned with C{numSplits} splits, or the - default parallelism level if C{numSplits} is not specified. + Output will be hash-partitioned with C{numPartitions} partitions, or + the default parallelism level if C{numPartitions} is not specified. >>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)] """ - return self.combineByKey(lambda x: x, func, func, numSplits) + return self.combineByKey(lambda x: x, func, func, numPartitions) def reduceByKeyLocally(self, func): """ @@ -474,7 +498,7 @@ class RDD(object): """ return self.map(lambda x: x[0]).countByValue() - def join(self, other, numSplits=None): + def join(self, other, numPartitions=None): """ Return an RDD containing all pairs of elements with matching keys in C{self} and C{other}. @@ -489,9 +513,9 @@ class RDD(object): >>> sorted(x.join(y).collect()) [('a', (1, 2)), ('a', (1, 3))] """ - return python_join(self, other, numSplits) + return python_join(self, other, numPartitions) - def leftOuterJoin(self, other, numSplits=None): + def leftOuterJoin(self, other, numPartitions=None): """ Perform a left outer join of C{self} and C{other}. @@ -506,9 +530,9 @@ class RDD(object): >>> sorted(x.leftOuterJoin(y).collect()) [('a', (1, 2)), ('b', (4, None))] """ - return python_left_outer_join(self, other, numSplits) + return python_left_outer_join(self, other, numPartitions) - def rightOuterJoin(self, other, numSplits=None): + def rightOuterJoin(self, other, numPartitions=None): """ Perform a right outer join of C{self} and C{other}. @@ -523,10 +547,10 @@ class RDD(object): >>> sorted(y.rightOuterJoin(x).collect()) [('a', (2, 1)), ('b', (None, 4))] """ - return python_right_outer_join(self, other, numSplits) + return python_right_outer_join(self, other, numPartitions) # TODO: add option to control map-side combining - def partitionBy(self, numSplits, partitionFunc=hash): + def partitionBy(self, numPartitions, partitionFunc=hash): """ Return a copy of the RDD partitioned using the specified partitioner. @@ -535,22 +559,22 @@ class RDD(object): >>> set(sets[0]).intersection(set(sets[1])) set([]) """ - if numSplits is None: - numSplits = self.ctx.defaultParallelism + if numPartitions is None: + numPartitions = self.ctx.defaultParallelism # Transferring O(n) objects to Java is too expensive. Instead, we'll - # form the hash buckets in Python, transferring O(numSplits) objects + # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. def add_shuffle_key(split, iterator): buckets = defaultdict(list) for (k, v) in iterator: - buckets[partitionFunc(k) % numSplits].append((k, v)) + buckets[partitionFunc(k) % numPartitions].append((k, v)) for (split, items) in buckets.iteritems(): yield str(split) yield dump_pickle(Batch(items)) keyed = PipelinedRDD(self, add_shuffle_key) keyed._bypass_serializer = True pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() - partitioner = self.ctx._jvm.PythonPartitioner(numSplits, + partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, id(partitionFunc)) jrdd = pairRDD.partitionBy(partitioner).values() rdd = RDD(jrdd, self.ctx) @@ -561,7 +585,7 @@ class RDD(object): # TODO: add control over map-side aggregation def combineByKey(self, createCombiner, mergeValue, mergeCombiners, - numSplits=None): + numPartitions=None): """ Generic function to combine the elements for each key using a custom set of aggregation functions. @@ -586,8 +610,8 @@ class RDD(object): >>> sorted(x.combineByKey(str, add, add).collect()) [('a', '11'), ('b', '1')] """ - if numSplits is None: - numSplits = self.ctx.defaultParallelism + if numPartitions is None: + numPartitions = self.ctx.defaultParallelism def combineLocally(iterator): combiners = {} for (k, v) in iterator: @@ -597,7 +621,7 @@ class RDD(object): combiners[k] = mergeValue(combiners[k], v) return combiners.iteritems() locally_combined = self.mapPartitions(combineLocally) - shuffled = locally_combined.partitionBy(numSplits) + shuffled = locally_combined.partitionBy(numPartitions) def _mergeCombiners(iterator): combiners = {} for (k, v) in iterator: @@ -609,10 +633,10 @@ class RDD(object): return shuffled.mapPartitions(_mergeCombiners) # TODO: support variant with custom partitioner - def groupByKey(self, numSplits=None): + def groupByKey(self, numPartitions=None): """ Group the values for each key in the RDD into a single sequence. - Hash-partitions the resulting RDD with into numSplits partitions. + Hash-partitions the resulting RDD with into numPartitions partitions. >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(x.groupByKey().collect()) @@ -630,7 +654,7 @@ class RDD(object): return a + b return self.combineByKey(createCombiner, mergeValue, mergeCombiners, - numSplits) + numPartitions) # TODO: add tests def flatMapValues(self, f): @@ -659,7 +683,7 @@ class RDD(object): return self.cogroup(other) # TODO: add variant with custom parittioner - def cogroup(self, other, numSplits=None): + def cogroup(self, other, numPartitions=None): """ For each key k in C{self} or C{other}, return a resulting RDD that contains a tuple with the list of values for that key in C{self} as well @@ -670,7 +694,7 @@ class RDD(object): >>> sorted(x.cogroup(y).collect()) [('a', ([1], [2])), ('b', ([4], []))] """ - return python_cogroup(self, other, numSplits) + return python_cogroup(self, other, numPartitions) # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the @@ -732,9 +756,8 @@ class PipelinedRDD(RDD): self.ctx._gateway._gateway_client) self.ctx._pickled_broadcast_vars.clear() class_manifest = self._prev_jrdd.classManifest() - env = copy.copy(self.ctx.environment) - env['PYTHONPATH'] = os.environ.get("PYTHONPATH", "") - env = MapConverter().convert(env, self.ctx._gateway._gateway_client) + env = MapConverter().convert(self.ctx.environment, + self.ctx._gateway._gateway_client) python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, class_manifest) |