aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2014-04-08 18:15:52 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-08 18:15:59 -0700
commitce8ec5456169682f27f846e7b8d51e6c4bcf75e3 (patch)
tree029a7ba0926eb1a8384ba73e74fc0bb018121528 /python
parent12c077d5aa0b76a808a55db625c9677a52bd43f9 (diff)
downloadspark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.gz
spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.bz2
spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.zip
Spark 1271: Co-Group and Group-By should pass Iterable[X]
Author: Holden Karau <holden@pigscanfly.ca> Closes #242 from holdenk/spark-1320-cogroupandgroupshouldpassiterator and squashes the following commits: f289536 [Holden Karau] Fix bad merge, should have been Iterable rather than Iterator 77048f8 [Holden Karau] Fix merge up to master d3fe909 [Holden Karau] use toSeq instead 7a092a3 [Holden Karau] switch resultitr to resultiterable eb06216 [Holden Karau] maybe I should have had a coffee first. use correct import for guava iterables c5075aa [Holden Karau] If guava 14 had iterables 2d06e10 [Holden Karau] Fix Java 8 cogroup tests for the new API 11e730c [Holden Karau] Fix streaming tests 66b583d [Holden Karau] Fix the core test suite to compile 4ed579b [Holden Karau] Refactor from iterator to iterable d052c07 [Holden Karau] Python tests now pass with iterator pandas 3bcd81d [Holden Karau] Revert "Try and make pickling list iterators work" cd1e81c [Holden Karau] Try and make pickling list iterators work c60233a [Holden Karau] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well 88a5cef [Holden Karau] Fix cogroup test in JavaAPISuite for streaming a5ee714 [Holden Karau] oops, was checking wrong iterator e687f21 [Holden Karau] Fix groupbykey test in JavaAPISuite of streaming ec8cc3e [Holden Karau] Fix test issues\! 4b0eeb9 [Holden Karau] Switch cast in PairDStreamFunctions fa395c9 [Holden Karau] Revert "Add a join based on the problem in SVD" ec99e32 [Holden Karau] Revert "Revert this but for now put things in list pandas" b692868 [Holden Karau] Revert 7e533f7 [Holden Karau] Fix the bug 8a5153a [Holden Karau] Revert me, but we have some stuff to debug b4e86a9 [Holden Karau] Add a join based on the problem in SVD c4510e2 [Holden Karau] Revert this but for now put things in list pandas b4e0b1d [Holden Karau] Fix style issues 71e8b9f [Holden Karau] I really need to stop calling size on iterators, it is the path of sadness. b1ae51a [Holden Karau] Fix some of the types in the streaming JavaAPI suite. Probably still needs more work 37888ec [Holden Karau] core/tests now pass 249abde [Holden Karau] org.apache.spark.rdd.PairRDDFunctionsSuite passes 6698186 [Holden Karau] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy" fe992fe [Holden Karau] hmmm try and fix up basic operation suite 172705c [Holden Karau] Fix Java API suite caafa63 [Holden Karau] I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy 88b3329 [Holden Karau] Fix groupbykey to actually give back an iterator 4991af6 [Holden Karau] Fix some tests be50246 [Holden Karau] Calling size on an iterator is not so good if we want to use it after 687ffbc [Holden Karau] This is the it compiles point of replacing Seq with Iterator and JList with JIterator in the groupby and cogroup signatures
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/join.py5
-rw-r--r--python/pyspark/rdd.py10
-rw-r--r--python/pyspark/resultiterable.py33
3 files changed, 41 insertions, 7 deletions
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index 5f4294fb1b..6f94d26ef8 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -31,11 +31,12 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
+from pyspark.resultiterable import ResultIterable
def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
- return vs.union(ws).groupByKey(numPartitions).flatMapValues(dispatch)
+ return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__()))
def python_join(rdd, other, numPartitions):
@@ -88,5 +89,5 @@ def python_cogroup(rdd, other, numPartitions):
vbuf.append(v)
elif n == 2:
wbuf.append(v)
- return (vbuf, wbuf)
+ return (ResultIterable(vbuf), ResultIterable(wbuf))
return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index fb27863e07..91fc7e637e 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -38,6 +38,7 @@ from pyspark.join import python_join, python_left_outer_join, \
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler
from pyspark.storagelevel import StorageLevel
+from pyspark.resultiterable import ResultIterable
from py4j.java_collections import ListConverter, MapConverter
@@ -1118,7 +1119,7 @@ class RDD(object):
Hash-partitions the resulting RDD with into numPartitions partitions.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> sorted(x.groupByKey().collect())
+ >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
[('a', [1, 1]), ('b', [1])]
"""
@@ -1133,7 +1134,7 @@ class RDD(object):
return a + b
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
- numPartitions)
+ numPartitions).mapValues(lambda x: ResultIterable(x))
# TODO: add tests
def flatMapValues(self, f):
@@ -1180,7 +1181,7 @@ class RDD(object):
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
- >>> sorted(x.cogroup(y).collect())
+ >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
[('a', ([1], [2])), ('b', ([4], []))]
"""
return python_cogroup(self, other, numPartitions)
@@ -1217,7 +1218,7 @@ class RDD(object):
>>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
>>> y = sc.parallelize(zip(range(0,5), range(0,5)))
- >>> sorted(x.cogroup(y).collect())
+ >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
[(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
"""
return self.map(lambda x: (f(x), x))
@@ -1317,7 +1318,6 @@ class RDD(object):
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
-
class PipelinedRDD(RDD):
"""
Pipelined maps:
diff --git a/python/pyspark/resultiterable.py b/python/pyspark/resultiterable.py
new file mode 100644
index 0000000000..7f418f8d2e
--- /dev/null
+++ b/python/pyspark/resultiterable.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["ResultIterable"]
+
+import collections
+
+class ResultIterable(collections.Iterable):
+ """
+ A special result iterable. This is used because the standard iterator can not be pickled
+ """
+ def __init__(self, data):
+ self.data = data
+ self.index = 0
+ self.maxindex = len(data)
+ def __iter__(self):
+ return iter(self.data)
+ def __len__(self):
+ return len(self.data)