aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala16
1 files changed, 8 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 0a0f0c36b5..e1e0c4241c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -485,7 +485,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues( pair =>
- for (v <- pair._1; w <- pair._2) yield (v, w)
+ for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
@@ -498,9 +498,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
- pair._1.map(v => (v, None))
+ pair._1.iterator.map(v => (v, None))
} else {
- for (v <- pair._1; w <- pair._2) yield (v, Some(w))
+ for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
@@ -515,9 +515,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
- pair._2.map(w => (None, w))
+ pair._2.iterator.map(w => (None, w))
} else {
- for (v <- pair._1; w <- pair._2) yield (Some(v), w)
+ for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
}
}
}
@@ -533,9 +533,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues {
- case (vs, Seq()) => vs.map(v => (Some(v), None))
- case (Seq(), ws) => ws.map(w => (None, Some(w)))
- case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
+ case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
+ case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
+ case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
}
}