diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-12-26 21:23:18 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-26 23:40:08 -0800 |
commit | 0f66b7f2fc25704fc299917a138a530c1c5f13c2 (patch) | |
tree | 74bcf750251f90cb5cff2ac60c4aaa30dfc4cb99 /core | |
parent | ec8c5dc644ce97c8cf6e13ba2b216ddbe16e9e0a (diff) | |
download | spark-0f66b7f2fc25704fc299917a138a530c1c5f13c2.tar.gz spark-0f66b7f2fc25704fc299917a138a530c1c5f13c2.tar.bz2 spark-0f66b7f2fc25704fc299917a138a530c1c5f13c2.zip |
Return efficient iterator if no spillage happened
Diffstat (limited to 'core')
3 files changed, 20 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index f977c03d3a..aedb832eb5 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -17,7 +17,9 @@ package org.apache.spark -import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap} +import scala.reflect.ClassTag + +import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap} /** * A set of functions used to aggregate data. @@ -26,7 +28,7 @@ import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap} * @param mergeValue function to merge a new value into the aggregation result. * @param mergeCombiners function to merge outputs from multiple mergeValue function. */ -case class Aggregator[K, V, C] ( +case class Aggregator[K, V, C: ClassTag] ( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 9512b418d7..0316d89398 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -69,7 +69,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, @@ -107,7 +107,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) /** * Simplified version of combineByKey that hash-partitions the output RDD. */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] = { @@ -296,8 +296,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ - def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) - : RDD[(K, C)] = { + def combineByKey[C: ClassTag]( + createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) + : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index ed8b1d36a9..991dd1845d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -18,7 +18,9 @@ package org.apache.spark.util.collection import java.io._ + import scala.collection.mutable.{ArrayBuffer, PriorityQueue} +import scala.reflect.ClassTag /** * A wrapper for SpillableAppendOnlyMap that handles two cases: @@ -29,7 +31,7 @@ import scala.collection.mutable.{ArrayBuffer, PriorityQueue} * (2) Otherwise, group values of the same key together before disk spill, and merge them * into combiners only after reading them back from disk. */ -class ExternalAppendOnlyMap[K, V, C]( +class ExternalAppendOnlyMap[K, V, C: ClassTag]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) @@ -74,7 +76,7 @@ class ExternalAppendOnlyMap[K, V, C]( * An append-only map that spills sorted content to disk when the memory threshold is exceeded. * A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V]. */ -class SpillableAppendOnlyMap[K, V, M, C]( +class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag]( createGroup: V => M, mergeValue: (M, V) => M, mergeGroups: (M, M) => M, @@ -114,7 +116,13 @@ class SpillableAppendOnlyMap[K, V, M, C]( oldMaps.append(new DiskIterator(file)) } - override def iterator: Iterator[(K, C)] = new ExternalIterator() + override def iterator: Iterator[(K, C)] = { + if (oldMaps.isEmpty && implicitly[ClassTag[M]] == implicitly[ClassTag[C]]) { + currentMap.iterator.asInstanceOf[Iterator[(K, C)]] + } else { + new ExternalIterator() + } + } // An iterator that sort-merges (K, M) pairs from memory and disk into (K, C) pairs class ExternalIterator extends Iterator[(K, C)] { |