aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-10-08 11:29:40 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-10-08 11:29:40 +0530
commit7be75682b931dd52014f3cfdc6887e54583ad0af (patch)
treeb0dd6a28def2562a6f63162034b75f6a8e292278 /python
parent3e41495288c897ee3d3b31d5f4836b1cf6ba54d9 (diff)
parentea34c521025d3408d44d45ab5c132fd9791794f6 (diff)
downloadspark-7be75682b931dd52014f3cfdc6887e54583ad0af.tar.gz
spark-7be75682b931dd52014f3cfdc6887e54583ad0af.tar.bz2
spark-7be75682b931dd52014f3cfdc6887e54583ad0af.zip
Merge branch 'master' into wip-merge-master
Conflicts: bagel/pom.xml core/pom.xml core/src/test/scala/org/apache/spark/ui/UISuite.scala examples/pom.xml mllib/pom.xml pom.xml project/SparkBuild.scala repl/pom.xml streaming/pom.xml tools/pom.xml In scala 2.10, a shorter representation is used for naming artifacts so changed to shorter scala version for artifacts and made it a property in pom.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py10
-rw-r--r--python/pyspark/serializers.py4
2 files changed, 10 insertions, 4 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 7611b13e82..33dc865256 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -29,7 +29,7 @@ from threading import Thread
from pyspark import cloudpickle
from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
- read_from_pickle_file
+ read_from_pickle_file, pack_long
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
@@ -690,11 +690,13 @@ class RDD(object):
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
def add_shuffle_key(split, iterator):
+
buckets = defaultdict(list)
+
for (k, v) in iterator:
buckets[partitionFunc(k) % numPartitions].append((k, v))
for (split, items) in buckets.iteritems():
- yield str(split)
+ yield pack_long(split)
yield dump_pickle(Batch(items))
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
@@ -831,8 +833,8 @@ class RDD(object):
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]
"""
- filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0
- map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]]
+ filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
+ map_func = lambda (key, vals): [(key, val) for val in vals[0]]
return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
def subtract(self, other, numPartitions=None):
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index fecacd1241..54fed1c9c7 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -67,6 +67,10 @@ def write_long(value, stream):
stream.write(struct.pack("!q", value))
+def pack_long(value):
+ return struct.pack("!q", value)
+
+
def read_int(stream):
length = stream.read(4)
if length == "":