diff options
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 182 |
1 files changed, 126 insertions, 56 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 58e1849cad..f87923e6fa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -18,7 +18,7 @@ from base64 import standard_b64encode as b64enc import copy from collections import defaultdict -from itertools import chain, ifilter, imap, product +from itertools import chain, ifilter, imap import operator import os import sys @@ -27,9 +27,8 @@ from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread -from pyspark import cloudpickle -from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \ - read_from_pickle_file +from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ + BatchedSerializer, CloudPickleSerializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -48,12 +47,15 @@ class RDD(object): operated on in parallel. """ - def __init__(self, jrdd, ctx): + def __init__(self, jrdd, ctx, jrdd_deserializer): self._jrdd = jrdd self.is_cached = False self.is_checkpointed = False self.ctx = ctx - self._partitionFunc = None + self._jrdd_deserializer = jrdd_deserializer + + def __repr__(self): + return self._jrdd.toString() @property def context(self): @@ -117,8 +119,6 @@ class RDD(object): else: return None - # TODO persist(self, storageLevel) - def map(self, f, preservesPartitioning=False): """ Return a new RDD containing the distinct elements in this RDD. @@ -227,7 +227,7 @@ class RDD(object): total = num samples = self.sample(withReplacement, fraction, seed).collect() - + # If the first sample didn't turn out large enough, keep trying to take samples; # this shouldn't happen often because we use a big multiplier for their initial size. # See: scala/spark/RDD.scala @@ -249,7 +249,23 @@ class RDD(object): >>> rdd.union(rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3] """ - return RDD(self._jrdd.union(other._jrdd), self.ctx) + if self._jrdd_deserializer == other._jrdd_deserializer: + rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, + self._jrdd_deserializer) + return rdd + else: + # These RDDs contain data in different serialized formats, so we + # must normalize them to the default serializer. + self_copy = self._reserialize() + other_copy = other._reserialize() + return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, + self.ctx.serializer) + + def _reserialize(self): + if self._jrdd_deserializer == self.ctx.serializer: + return self + else: + return self.map(lambda x: x, preservesPartitioning=True) def __add__(self, other): """ @@ -263,7 +279,55 @@ class RDD(object): raise TypeError return self.union(other) - # TODO: sort + def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x): + """ + Sorts this RDD, which is assumed to consist of (key, value) pairs. + + >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] + >>> sc.parallelize(tmp).sortByKey(True, 2).collect() + [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] + >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] + >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) + >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() + [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] + """ + if numPartitions is None: + numPartitions = self.ctx.defaultParallelism + + bounds = list() + + # first compute the boundary of each part via sampling: we want to partition + # the key-space into bins such that the bins have roughly the same + # number of (key, value) pairs falling into them + if numPartitions > 1: + rddSize = self.count() + maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner + fraction = min(maxSampleSize / max(rddSize, 1), 1.0) + + samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() + samples = sorted(samples, reverse=(not ascending), key=keyfunc) + + # we have numPartitions many parts but one of the them has + # an implicit boundary + for i in range(0, numPartitions - 1): + index = (len(samples) - 1) * (i + 1) / numPartitions + bounds.append(samples[index]) + + def rangePartitionFunc(k): + p = 0 + while p < len(bounds) and keyfunc(k) > bounds[p]: + p += 1 + if ascending: + return p + else: + return numPartitions-1-p + + def mapFunc(iterator): + yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k)) + + return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) + .mapPartitions(mapFunc,preservesPartitioning=True) + .flatMap(lambda x: x, preservesPartitioning=True)) def glom(self): """ @@ -288,17 +352,9 @@ class RDD(object): [(1, 1), (1, 2), (2, 1), (2, 2)] """ # Due to batching, we can't use the Java cartesian method. - java_cartesian = RDD(self._jrdd.cartesian(other._jrdd), self.ctx) - def unpack_batches(pair): - (x, y) = pair - if type(x) == Batch or type(y) == Batch: - xs = x.items if type(x) == Batch else [x] - ys = y.items if type(y) == Batch else [y] - for pair in product(xs, ys): - yield pair - else: - yield pair - return java_cartesian.flatMap(unpack_batches) + deserializer = CartesianDeserializer(self._jrdd_deserializer, + other._jrdd_deserializer) + return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer) def groupBy(self, f, numPartitions=None): """ @@ -345,8 +401,8 @@ class RDD(object): """ Return a list that contains all of the elements in this RDD. """ - picklesInJava = self._jrdd.collect().iterator() - return list(self._collect_iterator_through_file(picklesInJava)) + bytesInJava = self._jrdd.collect().iterator() + return list(self._collect_iterator_through_file(bytesInJava)) def _collect_iterator_through_file(self, iterator): # Transferring lots of data through Py4J can be slow because @@ -354,10 +410,10 @@ class RDD(object): # file and read it back. tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) tempFile.close() - self.ctx._writeIteratorToPickleFile(iterator, tempFile.name) + self.ctx._writeToFile(iterator, tempFile.name) # Read the data into Python and deserialize it: with open(tempFile.name, 'rb') as tempFile: - for item in read_from_pickle_file(tempFile): + for item in self._jrdd_deserializer.load_stream(tempFile): yield item os.unlink(tempFile.name) @@ -425,7 +481,7 @@ class RDD(object): 3 """ return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() - + def stats(self): """ Return a L{StatCounter} object that captures the mean, variance @@ -462,7 +518,7 @@ class RDD(object): 0.816... """ return self.stats().stdev() - + def sampleStdev(self): """ Compute the sample standard deviation of this RDD's elements (which corrects for bias in @@ -523,9 +579,14 @@ class RDD(object): # Take only up to num elements from each partition we try mapped = self.mapPartitions(takeUpToNum) items = [] + # TODO(shivaram): Similar to the scala implementation, update the take + # method to scan multiple splits based on an estimate of how many elements + # we have per-split. for partition in range(mapped._jrdd.splits().size()): - iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition) - items.extend(self._collect_iterator_through_file(iterator)) + partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) + partitionsToTake[0] = partition + iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() + items.extend(mapped._collect_iterator_through_file(iterator)) if len(items) >= num: break return items[:num] @@ -552,7 +613,10 @@ class RDD(object): '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' """ def func(split, iterator): - return (str(x).encode("utf-8") for x in iterator) + for x in iterator: + if not isinstance(x, basestring): + x = unicode(x) + yield x.encode("utf-8") keyed = PipelinedRDD(self, func) keyed._bypass_serializer = True keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) @@ -689,20 +753,23 @@ class RDD(object): # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. + outputSerializer = self.ctx._unbatched_serializer def add_shuffle_key(split, iterator): + buckets = defaultdict(list) + for (k, v) in iterator: buckets[partitionFunc(k) % numPartitions].append((k, v)) for (split, items) in buckets.iteritems(): - yield str(split) - yield dump_pickle(Batch(items)) + yield pack_long(split) + yield outputSerializer.dumps(items) keyed = PipelinedRDD(self, add_shuffle_key) keyed._bypass_serializer = True pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, id(partitionFunc)) jrdd = pairRDD.partitionBy(partitioner).values() - rdd = RDD(jrdd, self.ctx) + rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) # This is required so that id(partitionFunc) remains unique, even if # partitionFunc is a lambda: rdd._partitionFunc = partitionFunc @@ -739,7 +806,8 @@ class RDD(object): numPartitions = self.ctx.defaultParallelism def combineLocally(iterator): combiners = {} - for (k, v) in iterator: + for x in iterator: + (k, v) = x if k not in combiners: combiners[k] = createCombiner(v) else: @@ -830,9 +898,9 @@ class RDD(object): >>> y = sc.parallelize([("a", 3), ("c", None)]) >>> sorted(x.subtractByKey(y).collect()) [('b', 4), ('b', 5)] - """ - filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0 - map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]] + """ + filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 + map_func = lambda (key, vals): [(key, val) for val in vals[0]] return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) def subtract(self, other, numPartitions=None): @@ -881,50 +949,52 @@ class PipelinedRDD(RDD): 20 """ def __init__(self, prev, func, preservesPartitioning=False): - if isinstance(prev, PipelinedRDD) and prev._is_pipelinable(): + if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): + # This transformation is the first in its stage: + self.func = func + self.preservesPartitioning = preservesPartitioning + self._prev_jrdd = prev._jrdd + self._prev_jrdd_deserializer = prev._jrdd_deserializer + else: prev_func = prev.func def pipeline_func(split, iterator): return func(split, prev_func(split, iterator)) self.func = pipeline_func self.preservesPartitioning = \ prev.preservesPartitioning and preservesPartitioning - self._prev_jrdd = prev._prev_jrdd - else: - self.func = func - self.preservesPartitioning = preservesPartitioning - self._prev_jrdd = prev._jrdd + self._prev_jrdd = prev._prev_jrdd # maintain the pipeline + self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer self.is_cached = False self.is_checkpointed = False self.ctx = prev.ctx self.prev = prev self._jrdd_val = None + self._jrdd_deserializer = self.ctx.serializer self._bypass_serializer = False @property def _jrdd(self): if self._jrdd_val: return self._jrdd_val - func = self.func - if not self._bypass_serializer and self.ctx.batchSize != 1: - oldfunc = self.func - batchSize = self.ctx.batchSize - def batched_func(split, iterator): - return batched(oldfunc(split, iterator), batchSize) - func = batched_func - cmds = [func, self._bypass_serializer] - pipe_command = ' '.join(b64enc(cloudpickle.dumps(f)) for f in cmds) + if self._bypass_serializer: + serializer = NoOpSerializer() + else: + serializer = self.ctx.serializer + command = (self.func, self._prev_jrdd_deserializer, serializer) + pickled_command = CloudPickleSerializer().dumps(command) broadcast_vars = ListConverter().convert( [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], self.ctx._gateway._gateway_client) self.ctx._pickled_broadcast_vars.clear() - class_manifest = self._prev_jrdd.classManifest() + class_tag = self._prev_jrdd.classTag() env = MapConverter().convert(self.ctx.environment, self.ctx._gateway._gateway_client) includes = ListConverter().convert(self.ctx._python_includes, self.ctx._gateway._gateway_client) python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), - pipe_command, env, includes, self.preservesPartitioning, self.ctx.pythonExec, - broadcast_vars, self.ctx._javaAccumulator, class_manifest) + bytearray(pickled_command), env, includes, self.preservesPartitioning, + self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, + class_tag) self._jrdd_val = python_rdd.asJavaRDD() return self._jrdd_val |