aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-09-19 21:40:21 -0700
committerReynold Xin <rxin@databricks.com>2015-09-19 21:40:21 -0700
commit2117eea71ece825fbc3797c8b38184ae221f5223 (patch)
tree06481ef1968367118e89779335e24245f57f2017
parente789000b88a6bd840f821c53f42c08b97dc02496 (diff)
downloadspark-2117eea71ece825fbc3797c8b38184ae221f5223.tar.gz
spark-2117eea71ece825fbc3797c8b38184ae221f5223.tar.bz2
spark-2117eea71ece825fbc3797c8b38184ae221f5223.zip
[SPARK-10710] Remove ability to disable spilling in core and SQL
It does not make much sense to set `spark.shuffle.spill` or `spark.sql.planner.externalSort` to false: I believe that these configurations were initially added as "escape hatches" to guard against bugs in the external operators, but these operators are now mature and well-tested. In addition, these configurations are not handled in a consistent way anymore: SQL's Tungsten codepath ignores these configurations and will continue to use spilling operators. Similarly, Spark Core's `tungsten-sort` shuffle manager does not respect `spark.shuffle.spill=false`. This pull request removes these configurations, adds warnings at the appropriate places, and deletes a large amount of code which was only used in code paths that did not support spilling. Author: Josh Rosen <joshrosen@databricks.com> Closes #8831 from JoshRosen/remove-ability-to-disable-spilling.
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala22
-rw-r--r--docs/configuration.md14
-rw-r--r--docs/sql-programming-guide.md7
-rw-r--r--python/pyspark/rdd.py25
-rw-r--r--python/pyspark/shuffle.py30
-rw-r--r--python/pyspark/tests.py13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala30
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala26
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala4
18 files changed, 81 insertions, 234 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 289aab9bd9..7196e57d5d 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -18,7 +18,7 @@
package org.apache.spark
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
+import org.apache.spark.util.collection.ExternalAppendOnlyMap
/**
* :: DeveloperApi ::
@@ -34,59 +34,30 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
- // When spilling is enabled sorting will happen externally, but not necessarily with an
- // ExternalSorter.
- private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
-
@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
combineValuesByKey(iter, null)
- def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
- context: TaskContext): Iterator[(K, C)] = {
- if (!isSpillEnabled) {
- val combiners = new AppendOnlyMap[K, C]
- var kv: Product2[K, V] = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
- }
- while (iter.hasNext) {
- kv = iter.next()
- combiners.changeValue(kv._1, update)
- }
- combiners.iterator
- } else {
- val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
- combiners.insertAll(iter)
- updateMetrics(context, combiners)
- combiners.iterator
- }
+ def combineValuesByKey(
+ iter: Iterator[_ <: Product2[K, V]],
+ context: TaskContext): Iterator[(K, C)] = {
+ val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ combiners.insertAll(iter)
+ updateMetrics(context, combiners)
+ combiners.iterator
}
@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] =
combineCombinersByKey(iter, null)
- def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
- : Iterator[(K, C)] =
- {
- if (!isSpillEnabled) {
- val combiners = new AppendOnlyMap[K, C]
- var kc: Product2[K, C] = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
- }
- while (iter.hasNext) {
- kc = iter.next()
- combiners.changeValue(kc._1, update)
- }
- combiners.iterator
- } else {
- val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
- combiners.insertAll(iter)
- updateMetrics(context, combiners)
- combiners.iterator
- }
+ def combineCombinersByKey(
+ iter: Iterator[_ <: Product2[K, C]],
+ context: TaskContext): Iterator[(K, C)] = {
+ val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
+ combiners.insertAll(iter)
+ updateMetrics(context, combiners)
+ combiners.iterator
}
/** Update task metrics after populating the external map. */
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7bad749d58..935c3babd8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
+import org.apache.spark.util.collection.{CompactBuffer, ExternalAppendOnlyMap}
import org.apache.spark.util.Utils
import org.apache.spark.serializer.Serializer
@@ -128,8 +128,6 @@ class CoGroupedRDD[K: ClassTag](
override val partitioner: Some[Partitioner] = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
- val sparkConf = SparkEnv.get.conf
- val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length
@@ -150,34 +148,16 @@ class CoGroupedRDD[K: ClassTag](
rddIterators += ((it, depNum))
}
- if (!externalSorting) {
- val map = new AppendOnlyMap[K, CoGroupCombiner]
- val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
- if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
- }
- val getCombiner: K => CoGroupCombiner = key => {
- map.changeValue(key, update)
- }
- rddIterators.foreach { case (it, depNum) =>
- while (it.hasNext) {
- val kv = it.next()
- getCombiner(kv._1)(depNum) += kv._2
- }
- }
- new InterruptibleIterator(context,
- map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
- } else {
- val map = createExternalMap(numRdds)
- for ((it, depNum) <- rddIterators) {
- map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
- }
- context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
- context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
- new InterruptibleIterator(context,
- map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
+ val map = createExternalMap(numRdds)
+ for ((it, depNum) <- rddIterators) {
+ map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
+ context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
+ context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
+ context.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
+ new InterruptibleIterator(context,
+ map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
private def createExternalMap(numRdds: Int)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index c089088f40..0b46634b8b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -24,7 +24,13 @@ import org.apache.spark.shuffle._
* A ShuffleManager using hashing, that creates one output file per reduce partition on each
* mapper (possibly reusing these across waves of tasks).
*/
-private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
+private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
+
+ if (!conf.getBoolean("spark.shuffle.spill", true)) {
+ logWarning(
+ "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
+ " Shuffle will continue to spill to disk when necessary.")
+ }
private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index d7fab351ca..476cc1f303 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -19,11 +19,17 @@ package org.apache.spark.shuffle.sort
import java.util.concurrent.ConcurrentHashMap
-import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency}
+import org.apache.spark.{Logging, SparkConf, TaskContext, ShuffleDependency}
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.hash.HashShuffleReader
-private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
+private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
+
+ if (!conf.getBoolean("spark.shuffle.spill", true)) {
+ logWarning(
+ "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
+ " Shuffle will continue to spill to disk when necessary.")
+ }
private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 31230d5978..2a30f751ff 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -116,8 +116,6 @@ private[spark] class ExternalSorter[K, V, C](
private val ser = Serializer.getSerializer(serializer)
private val serInstance = ser.newInstance()
- private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
-
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
@@ -229,10 +227,6 @@ private[spark] class ExternalSorter[K, V, C](
* @param usingMap whether we're using a map or buffer as our current in-memory collection
*/
private def maybeSpillCollection(usingMap: Boolean): Unit = {
- if (!spillingEnabled) {
- return
- }
-
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1110ca6051..1fd470cd3b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -147,7 +147,7 @@ class SparkSubmitSuite
"--archives", "archive1.txt,archive2.txt",
"--num-executors", "6",
"--name", "beauty",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -166,7 +166,7 @@ class SparkSubmitSuite
mainClass should be ("org.apache.spark.deploy.yarn.Client")
classpath should have length (0)
sysProps("spark.app.name") should be ("beauty")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
sysProps("SPARK_SUBMIT") should be ("true")
sysProps.keys should not contain ("spark.jars")
}
@@ -185,7 +185,7 @@ class SparkSubmitSuite
"--archives", "archive1.txt,archive2.txt",
"--num-executors", "6",
"--name", "trill",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -206,7 +206,7 @@ class SparkSubmitSuite
sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
sysProps("SPARK_SUBMIT") should be ("true")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
}
test("handles standalone cluster mode") {
@@ -229,7 +229,7 @@ class SparkSubmitSuite
"--supervise",
"--driver-memory", "4g",
"--driver-cores", "5",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -253,9 +253,9 @@ class SparkSubmitSuite
sysProps.keys should contain ("spark.driver.memory")
sysProps.keys should contain ("spark.driver.cores")
sysProps.keys should contain ("spark.driver.supervise")
- sysProps.keys should contain ("spark.shuffle.spill")
+ sysProps.keys should contain ("spark.ui.enabled")
sysProps.keys should contain ("spark.submit.deployMode")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
}
test("handles standalone client mode") {
@@ -266,7 +266,7 @@ class SparkSubmitSuite
"--total-executor-cores", "5",
"--class", "org.SomeClass",
"--driver-memory", "4g",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -277,7 +277,7 @@ class SparkSubmitSuite
classpath(0) should endWith ("thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
}
test("handles mesos client mode") {
@@ -288,7 +288,7 @@ class SparkSubmitSuite
"--total-executor-cores", "5",
"--class", "org.SomeClass",
"--driver-memory", "4g",
- "--conf", "spark.shuffle.spill=false",
+ "--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
@@ -299,7 +299,7 @@ class SparkSubmitSuite
classpath(0) should endWith ("thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
- sysProps("spark.shuffle.spill") should be ("false")
+ sysProps("spark.ui.enabled") should be ("false")
}
test("handles confs with flag equivalents") {
diff --git a/docs/configuration.md b/docs/configuration.md
index 3700051efb..5ec097c78a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -69,7 +69,7 @@ val sc = new SparkContext(new SparkConf())
Then, you can supply configuration values at runtime:
{% highlight bash %}
-./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false
+./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
{% endhighlight %}
@@ -449,8 +449,8 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.shuffle.memoryFraction</code></td>
<td>0.2</td>
<td>
- Fraction of Java heap to use for aggregation and cogroups during shuffles, if
- <code>spark.shuffle.spill</code> is true. At any given time, the collective size of
+ Fraction of Java heap to use for aggregation and cogroups during shuffles.
+ At any given time, the collective size of
all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
begin to spill to disk. If spills are often, consider increasing this value at the expense of
<code>spark.storage.memoryFraction</code>.
@@ -484,14 +484,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.shuffle.spill</code></td>
- <td>true</td>
- <td>
- If set to "true", limits the amount of memory used during reduces by spilling data out to disk.
- This spilling threshold is specified by <code>spark.shuffle.memoryFraction</code>.
- </td>
-</tr>
-<tr>
<td><code>spark.shuffle.spill.compress</code></td>
<td>true</td>
<td>
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 82d4243cc6..7ae9244c27 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1936,13 +1936,6 @@ that these options will be deprecated in future release as more optimizations ar
Configures the number of partitions to use when shuffling data for joins or aggregations.
</td>
</tr>
- <tr>
- <td><code>spark.sql.planner.externalSort</code></td>
- <td>true</td>
- <td>
- When true, performs sorts spilling to disk as needed otherwise sort each partition in memory.
- </td>
- </tr>
</table>
# Distributed SQL Engine
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index ab5aab1e11..73d7d9a569 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -48,7 +48,7 @@ from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
-from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
+from pyspark.shuffle import Aggregator, ExternalMerger, \
get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync
@@ -580,12 +580,11 @@ class RDD(object):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
- spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == "true")
memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
serializer = self._jrdd_deserializer
def sortPartition(iterator):
- sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+ sort = ExternalSorter(memory * 0.9, serializer).sorted
return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending)))
return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
@@ -610,12 +609,11 @@ class RDD(object):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
- spill = self._can_spill()
memory = self._memory_limit()
serializer = self._jrdd_deserializer
def sortPartition(iterator):
- sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+ sort = ExternalSorter(memory * 0.9, serializer).sorted
return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
if numPartitions == 1:
@@ -1770,13 +1768,11 @@ class RDD(object):
numPartitions = self._defaultReducePartitions()
serializer = self.ctx.serializer
- spill = self._can_spill()
memory = self._memory_limit()
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
def combineLocally(iterator):
- merger = ExternalMerger(agg, memory * 0.9, serializer) \
- if spill else InMemoryMerger(agg)
+ merger = ExternalMerger(agg, memory * 0.9, serializer)
merger.mergeValues(iterator)
return merger.items()
@@ -1784,8 +1780,7 @@ class RDD(object):
shuffled = locally_combined.partitionBy(numPartitions)
def _mergeCombiners(iterator):
- merger = ExternalMerger(agg, memory, serializer) \
- if spill else InMemoryMerger(agg)
+ merger = ExternalMerger(agg, memory, serializer)
merger.mergeCombiners(iterator)
return merger.items()
@@ -1824,9 +1819,6 @@ class RDD(object):
return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
- def _can_spill(self):
- return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == "true"
-
def _memory_limit(self):
return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
@@ -1857,14 +1849,12 @@ class RDD(object):
a.extend(b)
return a
- spill = self._can_spill()
memory = self._memory_limit()
serializer = self._jrdd_deserializer
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
def combine(iterator):
- merger = ExternalMerger(agg, memory * 0.9, serializer) \
- if spill else InMemoryMerger(agg)
+ merger = ExternalMerger(agg, memory * 0.9, serializer)
merger.mergeValues(iterator)
return merger.items()
@@ -1872,8 +1862,7 @@ class RDD(object):
shuffled = locally_combined.partitionBy(numPartitions)
def groupByKey(it):
- merger = ExternalGroupBy(agg, memory, serializer)\
- if spill else InMemoryMerger(agg)
+ merger = ExternalGroupBy(agg, memory, serializer)
merger.mergeCombiners(it)
return merger.items()
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index b8118bdb7c..e974cda9fc 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -131,36 +131,6 @@ class Merger(object):
raise NotImplementedError
-class InMemoryMerger(Merger):
-
- """
- In memory merger based on in-memory dict.
- """
-
- def __init__(self, aggregator):
- Merger.__init__(self, aggregator)
- self.data = {}
-
- def mergeValues(self, iterator):
- """ Combine the items by creator and combiner """
- # speed up attributes lookup
- d, creator = self.data, self.agg.createCombiner
- comb = self.agg.mergeValue
- for k, v in iterator:
- d[k] = comb(d[k], v) if k in d else creator(v)
-
- def mergeCombiners(self, iterator):
- """ Merge the combined items by mergeCombiner """
- # speed up attributes lookup
- d, comb = self.data, self.agg.mergeCombiners
- for k, v in iterator:
- d[k] = comb(d[k], v) if k in d else v
-
- def items(self):
- """ Return the merged items ad iterator """
- return iter(self.data.items())
-
-
def _compressed_serializer(self, serializer=None):
# always use PickleSerializer to simplify implementation
ser = PickleSerializer()
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 647504c32f..f11aaf001c 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -62,7 +62,7 @@ from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer,
CloudPickleSerializer, CompressedSerializer, UTF8Deserializer, NoOpSerializer, \
PairDeserializer, CartesianDeserializer, AutoBatchedSerializer, AutoSerializer, \
FlattenedValuesSerializer
-from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter
+from pyspark.shuffle import Aggregator, ExternalMerger, ExternalSorter
from pyspark import shuffle
from pyspark.profiler import BasicProfiler
@@ -95,17 +95,6 @@ class MergerTests(unittest.TestCase):
lambda x, y: x.append(y) or x,
lambda x, y: x.extend(y) or x)
- def test_in_memory(self):
- m = InMemoryMerger(self.agg)
- m.mergeValues(self.data)
- self.assertEqual(sum(sum(v) for k, v in m.items()),
- sum(xrange(self.N)))
-
- m = InMemoryMerger(self.agg)
- m.mergeCombiners(map(lambda x_y: (x_y[0], [x_y[1]]), self.data))
- self.assertEqual(sum(sum(v) for k, v in m.items()),
- sum(xrange(self.N)))
-
def test_small_dataset(self):
m = ExternalMerger(self.agg, 1000)
m.mergeValues(self.data)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9de75f4c4d..b9fb90d964 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -330,11 +330,6 @@ private[spark] object SQLConf {
// Options that control which operators can be chosen by the query planner. These should be
// considered hints and may be ignored by future versions of Spark SQL.
- val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort",
- defaultValue = Some(true),
- doc = "When true, performs sorts spilling to disk as needed otherwise sort each partition in" +
- " memory.")
-
val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
defaultValue = Some(true),
doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.")
@@ -422,6 +417,7 @@ private[spark] object SQLConf {
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
+ val EXTERNAL_SORT = "spark.sql.planner.externalSort"
}
}
@@ -476,8 +472,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
- private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
-
private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5e40d77689..41b215c792 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -312,8 +312,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
TungstenSort.supportsSchema(child.schema)) {
execution.TungstenSort(sortExprs, global, child)
- } else if (sqlContext.conf.externalSortEnabled) {
- execution.ExternalSort(sortExprs, global, child)
} else {
execution.Sort(sortExprs, global, child)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 95209e6634..af28e2dfa4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -105,6 +105,15 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)
+ case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) =>
+ val runFunc = (sqlContext: SQLContext) => {
+ logWarning(
+ s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " +
+ s"External sort will continue to be used.")
+ Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true"))
+ }
+ (keyValueOutput, runFunc)
+
// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 40ef7c3b53..27f26245a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -31,38 +31,12 @@ import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
// This file defines various sort operators.
////////////////////////////////////////////////////////////////////////////////////////////////////
-
-/**
- * Performs a sort on-heap.
- * @param global when true performs a global sort of all partitions by shuffling the data first
- * if necessary.
- */
-case class Sort(
- sortOrder: Seq[SortOrder],
- global: Boolean,
- child: SparkPlan)
- extends UnaryNode {
- override def requiredChildDistribution: Seq[Distribution] =
- if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
-
- protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
- child.execute().mapPartitions( { iterator =>
- val ordering = newOrdering(sortOrder, child.output)
- iterator.map(_.copy()).toArray.sorted(ordering).iterator
- }, preservesPartitioning = true)
- }
-
- override def output: Seq[Attribute] = child.output
-
- override def outputOrdering: Seq[SortOrder] = sortOrder
-}
-
/**
* Performs a sort, spilling to disk as needed.
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
-case class ExternalSort(
+case class Sort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
@@ -93,7 +67,7 @@ case class ExternalSort(
}
/**
- * Optimized version of [[ExternalSort]] that operates on binary data (implemented as part of
+ * Optimized version of [[Sort]] that operates on binary data (implemented as part of
* Project Tungsten).
*
* @param global when true performs a global sort of all partitions by shuffling the data first
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index f9981356f3..05b4127cbc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -581,28 +581,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
mapData.collect().sortBy(_.data(1)).reverse.map(Row.fromTuple).toSeq)
}
- test("sorting") {
- withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false") {
- sortTest()
- }
- }
-
test("external sorting") {
- withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true") {
- sortTest()
- }
- }
-
- test("SPARK-6927 sorting with codegen on") {
- withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false",
- SQLConf.CODEGEN_ENABLED.key -> "true") {
- sortTest()
- }
+ sortTest()
}
test("SPARK-6927 external sorting with codegen on") {
- withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true",
- SQLConf.CODEGEN_ENABLED.key -> "true") {
+ withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
sortTest()
}
}
@@ -1731,10 +1715,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("external sorting updates peak execution memory") {
- withSQLConf((SQLConf.EXTERNAL_SORT.key, "true")) {
- AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
- sortTest()
- }
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
+ sortTest()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 4492e37ad0..5dc37e5c3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -32,7 +32,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
case c: ConvertToSafe => c
}
- private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
+ private val outputsSafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(!outputsSafe.outputsUnsafeRows)
private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(outputsUnsafe.outputsUnsafeRows)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 3073d492e6..847c188a30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -36,13 +36,13 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
checkAnswer(
input.toDF("a", "b", "c"),
- ExternalSort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
+ Sort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
sortAnswers = false)
checkAnswer(
input.toDF("a", "b", "c"),
- ExternalSort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
+ Sort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
sortAnswers = false)
}