diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-10-08 11:29:40 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-10-08 11:29:40 +0530 |
commit | 7be75682b931dd52014f3cfdc6887e54583ad0af (patch) | |
tree | b0dd6a28def2562a6f63162034b75f6a8e292278 /python/pyspark | |
parent | 3e41495288c897ee3d3b31d5f4836b1cf6ba54d9 (diff) | |
parent | ea34c521025d3408d44d45ab5c132fd9791794f6 (diff) | |
download | spark-7be75682b931dd52014f3cfdc6887e54583ad0af.tar.gz spark-7be75682b931dd52014f3cfdc6887e54583ad0af.tar.bz2 spark-7be75682b931dd52014f3cfdc6887e54583ad0af.zip |
Merge branch 'master' into wip-merge-master
Conflicts:
bagel/pom.xml
core/pom.xml
core/src/test/scala/org/apache/spark/ui/UISuite.scala
examples/pom.xml
mllib/pom.xml
pom.xml
project/SparkBuild.scala
repl/pom.xml
streaming/pom.xml
tools/pom.xml
In scala 2.10, a shorter representation is used for naming artifacts
so changed to shorter scala version for artifacts and made it a property in pom.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/rdd.py | 10 | ||||
-rw-r--r-- | python/pyspark/serializers.py | 4 |
2 files changed, 10 insertions, 4 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 7611b13e82..33dc865256 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -29,7 +29,7 @@ from threading import Thread from pyspark import cloudpickle from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \ - read_from_pickle_file + read_from_pickle_file, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -690,11 +690,13 @@ class RDD(object): # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. def add_shuffle_key(split, iterator): + buckets = defaultdict(list) + for (k, v) in iterator: buckets[partitionFunc(k) % numPartitions].append((k, v)) for (split, items) in buckets.iteritems(): - yield str(split) + yield pack_long(split) yield dump_pickle(Batch(items)) keyed = PipelinedRDD(self, add_shuffle_key) keyed._bypass_serializer = True @@ -831,8 +833,8 @@ class RDD(object): >>> sorted(x.subtractByKey(y).collect()) [('b', 4), ('b', 5)] """ - filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0 - map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]] + filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 + map_func = lambda (key, vals): [(key, val) for val in vals[0]] return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) def subtract(self, other, numPartitions=None): diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index fecacd1241..54fed1c9c7 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -67,6 +67,10 @@ def write_long(value, stream): stream.write(struct.pack("!q", value)) +def pack_long(value): + return struct.pack("!q", value) + + def read_int(stream): length = stream.read(4) if length == "": |