diff options
author | Andrew Or <andrewor14@gmail.com> | 2013-12-31 10:50:08 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-31 10:50:08 -0800 |
commit | 94ddc91d063f290a0e230a153f9e63b2f7357d4a (patch) | |
tree | 468c990b9dae3a2e8cd0c3f839e67aa4aae0748d | |
parent | 347fafe4fccc9345ed0ffa6c7863bc233c079b43 (diff) | |
download | spark-94ddc91d063f290a0e230a153f9e63b2f7357d4a.tar.gz spark-94ddc91d063f290a0e230a153f9e63b2f7357d4a.tar.bz2 spark-94ddc91d063f290a0e230a153f9e63b2f7357d4a.zip |
Address Aaron's and Jerry's comments
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 7 |
2 files changed, 6 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 77a594a3e4..1b2e5417e7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -169,10 +169,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: } val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (combiner1, combiner2) => { - combiner1.zipAll(combiner2, new CoGroup, new CoGroup).map { - case (v1, v2) => v1 ++ v2 + combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 } } - } new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner]( createCombiner, mergeValue, mergeCombiners) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 0e8f46cfc7..680ebf9b80 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -257,14 +257,15 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // Iterate through (K, G) pairs in sorted order from an on-disk map private class DiskKGIterator(file: File) extends Iterator[(K, G)] { - val in = ser.deserializeStream(new FileInputStream(file)) + val fstream = new FileInputStream(file) + val dstream = ser.deserializeStream(fstream) var nextItem: Option[(K, G)] = None var eof = false def readNextItem(): Option[(K, G)] = { if (!eof) { try { - return Some(in.readObject().asInstanceOf[(K, G)]) + return Some(dstream.readObject().asInstanceOf[(K, G)]) } catch { case e: EOFException => eof = true @@ -296,6 +297,8 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // TODO: Ensure this gets called even if the iterator isn't drained. def cleanup() { + fstream.close() + dstream.close() file.delete() } } |