diff options
author | zsxwing <zsxwing@gmail.com> | 2014-12-22 14:26:28 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-22 14:26:28 -0800 |
commit | c233ab3d8d75a33495298964fe73dbf7dd8fe305 (patch) | |
tree | 49811eb00136741ab5d320ed7d60519561b177f8 /core | |
parent | de9d7d2b5b6d80963505571700e83779fd98f850 (diff) | |
download | spark-c233ab3d8d75a33495298964fe73dbf7dd8fe305.tar.gz spark-c233ab3d8d75a33495298964fe73dbf7dd8fe305.tar.bz2 spark-c233ab3d8d75a33495298964fe73dbf7dd8fe305.zip |
[SPARK-4818][Core] Add 'iterator' to reduce memory consumed by join
In Scala, `map` and `flatMap` of `Iterable` will copy the contents of `Iterable` to a new `Seq`. Such as,
```Scala
val iterable = Seq(1, 2, 3).map(v => {
println(v)
v
})
println("Iterable map done")
val iterator = Seq(1, 2, 3).iterator.map(v => {
println(v)
v
})
println("Iterator map done")
```
outputed
```
1
2
3
Iterable map done
Iterator map done
```
So we should use 'iterator' to reduce memory consumed by join.
Found by Johannes Simon in http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3C5BE70814-9D03-4F61-AE2C-0D63F2DE4446%40mail.de%3E
Author: zsxwing <zsxwing@gmail.com>
Closes #3671 from zsxwing/SPARK-4824 and squashes the following commits:
48ee7b9 [zsxwing] Remove the explicit types
95d59d6 [zsxwing] Add 'iterator' to reduce memory consumed by join
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index fe3129b62f..4469c89e6b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -483,7 +483,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues( pair => - for (v <- pair._1; w <- pair._2) yield (v, w) + for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) } @@ -496,9 +496,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { - pair._1.map(v => (v, None)) + pair._1.iterator.map(v => (v, None)) } else { - for (v <- pair._1; w <- pair._2) yield (v, Some(w)) + for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w)) } } } @@ -513,9 +513,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { - pair._2.map(w => (None, w)) + pair._2.iterator.map(w => (None, w)) } else { - for (v <- pair._1; w <- pair._2) yield (Some(v), w) + for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w) } } } @@ -531,9 +531,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { - case (vs, Seq()) => vs.map(v => (Some(v), None)) - case (Seq(), ws) => ws.map(w => (None, Some(w))) - case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w)) + case (vs, Seq()) => vs.iterator.map(v => (Some(v), None)) + case (Seq(), ws) => ws.iterator.map(w => (None, Some(w))) + case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w)) } } |