aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-12-26 21:23:18 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-26 23:40:08 -0800
commit0f66b7f2fc25704fc299917a138a530c1c5f13c2 (patch)
tree74bcf750251f90cb5cff2ac60c4aaa30dfc4cb99 /core
parentec8c5dc644ce97c8cf6e13ba2b216ddbe16e9e0a (diff)
downloadspark-0f66b7f2fc25704fc299917a138a530c1c5f13c2.tar.gz
spark-0f66b7f2fc25704fc299917a138a530c1c5f13c2.tar.bz2
spark-0f66b7f2fc25704fc299917a138a530c1c5f13c2.zip
Return efficient iterator if no spillage happened
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala14
3 files changed, 20 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index f977c03d3a..aedb832eb5 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,7 +17,9 @@
package org.apache.spark
-import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
/**
* A set of functions used to aggregate data.
@@ -26,7 +28,7 @@ import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
-case class Aggregator[K, V, C] (
+case class Aggregator[K, V, C: ClassTag] (
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 9512b418d7..0316d89398 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -69,7 +69,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
- def combineByKey[C](createCombiner: V => C,
+ def combineByKey[C: ClassTag](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
@@ -107,7 +107,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
/**
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
- def combineByKey[C](createCombiner: V => C,
+ def combineByKey[C: ClassTag](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = {
@@ -296,8 +296,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
- def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
- : RDD[(K, C)] = {
+ def combineByKey[C: ClassTag](
+ createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
+ : RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index ed8b1d36a9..991dd1845d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -18,7 +18,9 @@
package org.apache.spark.util.collection
import java.io._
+
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
+import scala.reflect.ClassTag
/**
* A wrapper for SpillableAppendOnlyMap that handles two cases:
@@ -29,7 +31,7 @@ import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
* (2) Otherwise, group values of the same key together before disk spill, and merge them
* into combiners only after reading them back from disk.
*/
-class ExternalAppendOnlyMap[K, V, C](
+class ExternalAppendOnlyMap[K, V, C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)
@@ -74,7 +76,7 @@ class ExternalAppendOnlyMap[K, V, C](
* An append-only map that spills sorted content to disk when the memory threshold is exceeded.
* A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V].
*/
-class SpillableAppendOnlyMap[K, V, M, C](
+class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
createGroup: V => M,
mergeValue: (M, V) => M,
mergeGroups: (M, M) => M,
@@ -114,7 +116,13 @@ class SpillableAppendOnlyMap[K, V, M, C](
oldMaps.append(new DiskIterator(file))
}
- override def iterator: Iterator[(K, C)] = new ExternalIterator()
+ override def iterator: Iterator[(K, C)] = {
+ if (oldMaps.isEmpty && implicitly[ClassTag[M]] == implicitly[ClassTag[C]]) {
+ currentMap.iterator.asInstanceOf[Iterator[(K, C)]]
+ } else {
+ new ExternalIterator()
+ }
+ }
// An iterator that sort-merges (K, M) pairs from memory and disk into (K, C) pairs
class ExternalIterator extends Iterator[(K, C)] {