aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-31 10:50:08 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-31 10:50:08 -0800
commit94ddc91d063f290a0e230a153f9e63b2f7357d4a (patch)
tree468c990b9dae3a2e8cd0c3f839e67aa4aae0748d
parent347fafe4fccc9345ed0ffa6c7863bc233c079b43 (diff)
downloadspark-94ddc91d063f290a0e230a153f9e63b2f7357d4a.tar.gz
spark-94ddc91d063f290a0e230a153f9e63b2f7357d4a.tar.bz2
spark-94ddc91d063f290a0e230a153f9e63b2f7357d4a.zip
Address Aaron's and Jerry's comments
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala7
2 files changed, 6 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 77a594a3e4..1b2e5417e7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -169,10 +169,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
(combiner1, combiner2) => {
- combiner1.zipAll(combiner2, new CoGroup, new CoGroup).map {
- case (v1, v2) => v1 ++ v2
+ combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 }
}
- }
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
createCombiner, mergeValue, mergeCombiners)
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 0e8f46cfc7..680ebf9b80 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -257,14 +257,15 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
// Iterate through (K, G) pairs in sorted order from an on-disk map
private class DiskKGIterator(file: File) extends Iterator[(K, G)] {
- val in = ser.deserializeStream(new FileInputStream(file))
+ val fstream = new FileInputStream(file)
+ val dstream = ser.deserializeStream(fstream)
var nextItem: Option[(K, G)] = None
var eof = false
def readNextItem(): Option[(K, G)] = {
if (!eof) {
try {
- return Some(in.readObject().asInstanceOf[(K, G)])
+ return Some(dstream.readObject().asInstanceOf[(K, G)])
} catch {
case e: EOFException =>
eof = true
@@ -296,6 +297,8 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
// TODO: Ensure this gets called even if the iterator isn't drained.
def cleanup() {
+ fstream.close()
+ dstream.close()
file.delete()
}
}