aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-14 14:52:24 -0800
committerReynold Xin <rxin@apache.org>2014-01-14 14:52:24 -0800
commitd601a76d1fdd25b95020b2e32bacde583cf6aa50 (patch)
tree65b9be8095d6a91690ca207c29b729cb6e9fe2ff
parent2ce23a55a3c4033873bb262919d89e5afabb9134 (diff)
parent8ea2cd56e4a243a834214d04e29502a5fdb539df (diff)
downloadspark-d601a76d1fdd25b95020b2e32bacde583cf6aa50.tar.gz
spark-d601a76d1fdd25b95020b2e32bacde583cf6aa50.tar.bz2
spark-d601a76d1fdd25b95020b2e32bacde583cf6aa50.zip
Merge pull request #427 from pwendell/deprecate-aggregator
Deprecate rather than remove old combineValuesByKey function
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala22
1 files changed, 17 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 6d439fdc68..edbea6ea56 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,6 +17,8 @@
package org.apache.spark
+import scala.{Option, deprecated}
+
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
/**
@@ -34,8 +36,12 @@ case class Aggregator[K, V, C] (
private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
+ @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
+ def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
+ combineValuesByKey(iter, null)
+
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
- context: TaskContext) : Iterator[(K, C)] = {
+ context: TaskContext): Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
@@ -53,12 +59,17 @@ case class Aggregator[K, V, C] (
val (k, v) = iter.next()
combiners.insert(k, v)
}
- context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
- context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
+ // TODO: Make this non optional in a future release
+ Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
+ Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
combiners.iterator
}
}
+ @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
+ def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
+ combineCombinersByKey(iter, null)
+
def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
@@ -77,8 +88,9 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
- context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
- context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
+ // TODO: Make this non optional in a future release
+ Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
+ Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
combiners.iterator
}
}