diff options
author | Ilya Ganelin <ilya.ganelin@capitalone.com> | 2014-12-10 14:19:37 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2014-12-10 14:19:37 -0800 |
commit | 447ae2de5d4c2af865fdb63f8b876b865de60f74 (patch) | |
tree | c97a0f11bba2f8c1acab36d3f1e227519df39011 | |
parent | e230da18f8c354b4b80416aa5277420397acf4f2 (diff) | |
download | spark-447ae2de5d4c2af865fdb63f8b876b865de60f74.tar.gz spark-447ae2de5d4c2af865fdb63f8b876b865de60f74.tar.bz2 spark-447ae2de5d4c2af865fdb63f8b876b865de60f74.zip |
[SPARK-4569] Rename 'externalSorting' in Aggregator
Hi all - I've renamed the unhelpfully named variable and added a comment clarifying what's actually happening.
Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Closes #3666 from ilganeli/SPARK-4569B and squashes the following commits:
1810394 [Ilya Ganelin] [SPARK-4569] Rename 'externalSorting' in Aggregator
e2d2092 [Ilya Ganelin] [SPARK-4569] Rename 'externalSorting' in Aggregator
d7cefec [Ilya Ganelin] [SPARK-4569] Rename 'externalSorting' in Aggregator
5b3f39c [Ilya Ganelin] [SPARK-4569] Rename in Aggregator
-rw-r--r-- | core/src/main/scala/org/apache/spark/Aggregator.scala | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 79c9c451d2..09eb9605fb 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -34,7 +34,9 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) + // When spilling is enabled sorting will happen externally, but not necessarily with an + // ExternalSorter. + private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = @@ -42,7 +44,7 @@ case class Aggregator[K, V, C] ( def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { - if (!externalSorting) { + if (!isSpillEnabled) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { @@ -71,9 +73,9 @@ case class Aggregator[K, V, C] ( combineCombinersByKey(iter, null) def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext) - : Iterator[(K, C)] = + : Iterator[(K, C)] = { - if (!externalSorting) { + if (!isSpillEnabled) { val combiners = new AppendOnlyMap[K,C] var kc: Product2[K, C] = null val update = (hadValue: Boolean, oldValue: C) => { |