aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-09-19 21:40:21 -0700
committerReynold Xin <rxin@databricks.com>2015-09-19 21:40:21 -0700
commit2117eea71ece825fbc3797c8b38184ae221f5223 (patch)
tree06481ef1968367118e89779335e24245f57f2017 /core
parente789000b88a6bd840f821c53f42c08b97dc02496 (diff)
downloadspark-2117eea71ece825fbc3797c8b38184ae221f5223.tar.gz
spark-2117eea71ece825fbc3797c8b38184ae221f5223.tar.bz2
spark-2117eea71ece825fbc3797c8b38184ae221f5223.zip
[SPARK-10710] Remove ability to disable spilling in core and SQL
It does not make much sense to set `spark.shuffle.spill` or `spark.sql.planner.externalSort` to false: I believe that these configurations were initially added as "escape hatches" to guard against bugs in the external operators, but these operators are now mature and well-tested. In addition, these configurations are not handled in a consistent way anymore: SQL's Tungsten codepath ignores these configurations and will continue to use spilling operators. Similarly, Spark Core's `tungsten-sort` shuffle manager does not respect `spark.shuffle.spill=false`. This pull request removes these configurations, adds warnings at the appropriate places, and deletes a large amount of code which was only used in code paths that did not support spilling. Author: Josh Rosen <joshrosen@databricks.com> Closes #8831 from JoshRosen/remove-ability-to-disable-spilling.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala22
6 files changed, 51 insertions, 94 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 289aab9bd9..7196e57d5d 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -18,7 +18,7 @@
package org.apache.spark
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
+import org.apache.spark.util.collection.ExternalAppendOnlyMap
/**
* :: DeveloperApi ::
@@ -34,59 +34,30 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
- // When spilling is enabled sorting will happen externally, but not necessarily with an
- // ExternalSorter.
- private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
-
@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
combineValuesByKey(iter, null)
- def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
- context: TaskContext): Iterator[(K, C)] = {
- if (!isSpillEnabled) {
- val combiners = new AppendOnlyMap[K, C]
- var kv: Product2[K, V] = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
- }
- while (iter.hasNext) {
- kv = iter.next()
- combiners.changeValue(kv._1, update)
- }
- combiners.iterator
- } else {
- val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
- combiners.insertAll(iter)
- updateMetrics(context, combiners)
- combiners.iterator
- }
+ def combineValuesByKey(
+ iter: Iterator[_ <: Product2[K, V]],
+ context: TaskContext): Iterator[(K, C)] = {
+ val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ combiners.insertAll(iter)
+ updateMetrics(context, combiners)
+ combiners.iterator
}
@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] =
combineCombinersByKey(iter, null)
- def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
- : Iterator[(K, C)] =
- {
- if (!isSpillEnabled) {
- val combiners = new AppendOnlyMap[K, C]
- var kc: Product2[K, C] = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
- }
- while (iter.hasNext) {
- kc = iter.next()
- combiners.changeValue(kc._1, update)
- }
- combiners.iterator
- } else {
- val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
- combiners.insertAll(iter)
- updateMetrics(context, combiners)
- combiners.iterator
- }
+ def combineCombinersByKey(
+ iter: Iterator[_ <: Product2[K, C]],
+ context: TaskContext): Iterator[(K, C)] = {
+ val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
+ combiners.insertAll(iter)
+ updateMetrics(context, combiners)
+ combiners.iterator
}
/** Update task metrics after populating the external map. */
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7bad749d58..935c3babd8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
+import org.apache.spark.util.collection.{CompactBuffer, ExternalAppendOnlyMap}
import org.apache.spark.util.Utils
import org.apache.spark.serializer.Serializer
@@ -128,8 +128,6 @@ class CoGroupedRDD[K: ClassTag](
override val partitioner: Some[Partitioner] = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
- val sparkConf = SparkEnv.get.conf
- val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length
@@ -150,34 +148,16 @@ class CoGroupedRDD[K: ClassTag](
rddIterators += ((it, depNum))
}
- if (!externalSorting) {
- val map = new AppendOnlyMap[K, CoGroupCombiner]
- val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
- if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
- }
- val getCombiner: K => CoGroupCombiner = key => {
- map.changeValue(key, update)
- }
- rddIterators.foreach { case (it, depNum) =>
- while (it.hasNext) {
- val kv = it.next()
- getCombiner(kv._1)(depNum) += kv._2
- }
- }
- new InterruptibleIterator(context,
- map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
- } else {
- val map = createExternalMap(numRdds)
- for ((it, depNum) <- rddIterators) {
- map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
- }
- context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
- context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
- new InterruptibleIterator(context,
- map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
+ val map = createExternalMap(numRdds)
+ for ((it, depNum) <- rddIterators) {
+ map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
+ context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
+ context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
+ context.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
+ new InterruptibleIterator(context,
+ map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
private def createExternalMap(numRdds: Int)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index c089088f40..0b46634b8b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -24,7 +24,13 @@ import org.apache.spark.shuffle._
* A ShuffleManager using hashing, that creates one output file per reduce partition on each
* mapper (possibly reusing these across waves of tasks).
*/
-private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
+private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
+
+ if (!conf.getBoolean("spark.shuffle.spill", true)) {
+ logWarning(
+ "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
+ " Shuffle will continue to spill to disk when necessary.")
+ }
private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index d7fab351ca..476cc1f303 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -19,11 +19,17 @@ package org.apache.spark.shuffle.sort
import java.util.concurrent.ConcurrentHashMap
-import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency}
+import org.apache.spark.{Logging, SparkConf, TaskContext, ShuffleDependency}
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.hash.HashShuffleReader
-private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
+private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
+
+ if (!conf.getBoolean("spark.shuffle.spill", true)) {
+ logWarning(
+ "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
+ " Shuffle will continue to spill to disk when necessary.")
+ }
private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 31230d5978..2a30f751ff 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -116,8 +116,6 @@ private[spark] class ExternalSorter[K, V, C](
private val ser = Serializer.getSerializer(serializer)
private val serInstance = ser.newInstance()
- private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
-
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
@@ -229,10 +227,6 @@ private[spark] class ExternalSorter[K, V, C](
* @param usingMap whether we're using a map or buffer as our current in-memory collection
*/
private def maybeSpillCollection(usingMap: Boolean): Unit = {
- if (!spillingEnabled) {
- return
- }
-
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1110ca6051..1fd470cd3b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -147,7 +147,7 @@ class SparkSubmitSuite
"--archives", "archive1.txt,archive2.txt",
"--num-executors", "6",
"--name", "beauty",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -166,7 +166,7 @@ class SparkSubmitSuite
mainClass should be ("org.apache.spark.deploy.yarn.Client")
classpath should have length (0)
sysProps("spark.app.name") should be ("beauty")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
sysProps("SPARK_SUBMIT") should be ("true")
sysProps.keys should not contain ("spark.jars")
}
@@ -185,7 +185,7 @@ class SparkSubmitSuite
"--archives", "archive1.txt,archive2.txt",
"--num-executors", "6",
"--name", "trill",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -206,7 +206,7 @@ class SparkSubmitSuite
sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
sysProps("SPARK_SUBMIT") should be ("true")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
}
test("handles standalone cluster mode") {
@@ -229,7 +229,7 @@ class SparkSubmitSuite
"--supervise",
"--driver-memory", "4g",
"--driver-cores", "5",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -253,9 +253,9 @@ class SparkSubmitSuite
sysProps.keys should contain ("spark.driver.memory")
sysProps.keys should contain ("spark.driver.cores")
sysProps.keys should contain ("spark.driver.supervise")
- sysProps.keys should contain ("spark.shuffle.spill")
+ sysProps.keys should contain ("spark.ui.enabled")
sysProps.keys should contain ("spark.submit.deployMode")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
}
test("handles standalone client mode") {
@@ -266,7 +266,7 @@ class SparkSubmitSuite
"--total-executor-cores", "5",
"--class", "org.SomeClass",
"--driver-memory", "4g",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -277,7 +277,7 @@ class SparkSubmitSuite
classpath(0) should endWith ("thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
}
test("handles mesos client mode") {
@@ -288,7 +288,7 @@ class SparkSubmitSuite
"--total-executor-cores", "5",
"--class", "org.SomeClass",
"--driver-memory", "4g",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -299,7 +299,7 @@ class SparkSubmitSuite
classpath(0) should endWith ("thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
}
test("handles confs with flag equivalents") {