diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-14 14:52:24 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-14 14:52:24 -0800 |
commit | d601a76d1fdd25b95020b2e32bacde583cf6aa50 (patch) | |
tree | 65b9be8095d6a91690ca207c29b729cb6e9fe2ff | |
parent | 2ce23a55a3c4033873bb262919d89e5afabb9134 (diff) | |
parent | 8ea2cd56e4a243a834214d04e29502a5fdb539df (diff) | |
download | spark-d601a76d1fdd25b95020b2e32bacde583cf6aa50.tar.gz spark-d601a76d1fdd25b95020b2e32bacde583cf6aa50.tar.bz2 spark-d601a76d1fdd25b95020b2e32bacde583cf6aa50.zip |
Merge pull request #427 from pwendell/deprecate-aggregator
Deprecate rather than remove old combineValuesByKey function
-rw-r--r-- | core/src/main/scala/org/apache/spark/Aggregator.scala | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 6d439fdc68..edbea6ea56 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -17,6 +17,8 @@ package org.apache.spark +import scala.{Option, deprecated} + import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap} /** @@ -34,8 +36,12 @@ case class Aggregator[K, V, C] ( private val sparkConf = SparkEnv.get.conf private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) + @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = + combineValuesByKey(iter, null) + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], - context: TaskContext) : Iterator[(K, C)] = { + context: TaskContext): Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null @@ -53,12 +59,17 @@ case class Aggregator[K, V, C] ( val (k, v) = iter.next() combiners.insert(k, v) } - context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled + // TODO: Make this non optional in a future release + Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) + Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) combiners.iterator } } + @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0") + def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = + combineCombinersByKey(iter, null) + def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] @@ -77,8 +88,9 @@ case class Aggregator[K, V, C] ( val (k, c) = iter.next() combiners.insert(k, c) } - context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled + // TODO: Make this non optional in a future release + Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) + Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) combiners.iterator } } |