aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/memory/MemoryConsumer.java19
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java7
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala111
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala150
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/Spillable.scala48
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala17
-rw-r--r--project/MimaExcludes.scala3
8 files changed, 324 insertions, 46 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 36138cc9a2..840f13b394 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -45,7 +45,7 @@ public abstract class MemoryConsumer {
/**
* Returns the size of used memory in bytes.
*/
- long getUsed() {
+ protected long getUsed() {
return used;
}
@@ -130,4 +130,21 @@ public abstract class MemoryConsumer {
used -= page.size();
taskMemoryManager.freePage(page, this);
}
+
+ /**
+ * Allocates a heap memory of `size`.
+ */
+ public long acquireOnHeapMemory(long size) {
+ long granted = taskMemoryManager.acquireExecutionMemory(size, MemoryMode.ON_HEAP, this);
+ used += granted;
+ return granted;
+ }
+
+ /**
+ * Release N bytes of heap memory.
+ */
+ public void freeOnHeapMemory(long size) {
+ taskMemoryManager.releaseExecutionMemory(size, MemoryMode.ON_HEAP, this);
+ used -= size;
+ }
}
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 9044bb4f4a..6b7d9aa20f 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -408,4 +408,11 @@ public class TaskMemoryManager {
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
}
+
+ /**
+ * Returns Tungsten memory mode
+ */
+ public MemoryMode getTungstenMemoryMode(){
+ return tungstenMemoryMode;
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 95351e9826..fc71f8365c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -61,10 +61,10 @@ class ExternalAppendOnlyMap[K, V, C](
blockManager: BlockManager = SparkEnv.get.blockManager,
context: TaskContext = TaskContext.get(),
serializerManager: SerializerManager = SparkEnv.get.serializerManager)
- extends Iterable[(K, C)]
+ extends Spillable[SizeTracker](context.taskMemoryManager())
with Serializable
with Logging
- with Spillable[SizeTracker] {
+ with Iterable[(K, C)] {
if (context == null) {
throw new IllegalStateException(
@@ -81,9 +81,7 @@ class ExternalAppendOnlyMap[K, V, C](
this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get())
}
- override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager()
-
- private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
+ @volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
private val diskBlockManager = blockManager.diskBlockManager
@@ -117,6 +115,8 @@ class ExternalAppendOnlyMap[K, V, C](
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()
+ @volatile private var readingIterator: SpillableIterator = null
+
/**
* Number of files this map has spilled so far.
* Exposed for testing.
@@ -182,6 +182,29 @@ class ExternalAppendOnlyMap[K, V, C](
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk.
*/
override protected[this] def spill(collection: SizeTracker): Unit = {
+ val inMemoryIterator = currentMap.destructiveSortedIterator(keyComparator)
+ val diskMapIterator = spillMemoryIteratorToDisk(inMemoryIterator)
+ spilledMaps.append(diskMapIterator)
+ }
+
+ /**
+ * Force to spilling the current in-memory collection to disk to release memory,
+ * It will be called by TaskMemoryManager when there is not enough memory for the task.
+ */
+ override protected[this] def forceSpill(): Boolean = {
+ assert(readingIterator != null)
+ val isSpilled = readingIterator.spill()
+ if (isSpilled) {
+ currentMap = null
+ }
+ isSpilled
+ }
+
+ /**
+ * Spill the in-memory Iterator to a temporary file on disk.
+ */
+ private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)])
+ : DiskMapIterator = {
val (blockId, file) = diskBlockManager.createTempLocalBlock()
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
@@ -202,9 +225,8 @@ class ExternalAppendOnlyMap[K, V, C](
var success = false
try {
- val it = currentMap.destructiveSortedIterator(keyComparator)
- while (it.hasNext) {
- val kv = it.next()
+ while (inMemoryIterator.hasNext) {
+ val kv = inMemoryIterator.next()
writer.write(kv._1, kv._2)
objectsWritten += 1
@@ -237,7 +259,17 @@ class ExternalAppendOnlyMap[K, V, C](
}
}
- spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
+ new DiskMapIterator(file, blockId, batchSizes)
+ }
+
+ /**
+ * Returns a destructive iterator for iterating over the entries of this map.
+ * If this iterator is forced spill to disk to release memory when there is not enough memory,
+ * it returns pairs from an on-disk map.
+ */
+ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {
+ readingIterator = new SpillableIterator(inMemoryIterator)
+ readingIterator
}
/**
@@ -250,15 +282,18 @@ class ExternalAppendOnlyMap[K, V, C](
"ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
}
if (spilledMaps.isEmpty) {
- CompletionIterator[(K, C), Iterator[(K, C)]](currentMap.iterator, freeCurrentMap())
+ CompletionIterator[(K, C), Iterator[(K, C)]](
+ destructiveIterator(currentMap.iterator), freeCurrentMap())
} else {
new ExternalIterator()
}
}
private def freeCurrentMap(): Unit = {
- currentMap = null // So that the memory can be garbage-collected
- releaseMemory()
+ if (currentMap != null) {
+ currentMap = null // So that the memory can be garbage-collected
+ releaseMemory()
+ }
}
/**
@@ -272,8 +307,8 @@ class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
- private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](
- currentMap.destructiveSortedIterator(keyComparator), freeCurrentMap())
+ private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
+ currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
inputStreams.foreach { it =>
@@ -532,8 +567,56 @@ class ExternalAppendOnlyMap[K, V, C](
context.addTaskCompletionListener(context => cleanup())
}
+ private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
+ extends Iterator[(K, C)] {
+
+ private val SPILL_LOCK = new Object()
+
+ private var nextUpstream: Iterator[(K, C)] = null
+
+ private var cur: (K, C) = readNext()
+
+ private var hasSpilled: Boolean = false
+
+ def spill(): Boolean = SPILL_LOCK.synchronized {
+ if (hasSpilled) {
+ false
+ } else {
+ logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
+ s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
+ nextUpstream = spillMemoryIteratorToDisk(upstream)
+ hasSpilled = true
+ true
+ }
+ }
+
+ def readNext(): (K, C) = SPILL_LOCK.synchronized {
+ if (nextUpstream != null) {
+ upstream = nextUpstream
+ nextUpstream = null
+ }
+ if (upstream.hasNext) {
+ upstream.next()
+ } else {
+ null
+ }
+ }
+
+ override def hasNext(): Boolean = cur != null
+
+ override def next(): (K, C) = {
+ val r = cur
+ cur = readNext()
+ r
+ }
+ }
+
/** Convenience function to hash the given (K, C) pair by the key. */
private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)
+
+ override def toString(): String = {
+ this.getClass.getName + "@" + java.lang.Integer.toHexString(this.hashCode())
+ }
}
private[spark] object ExternalAppendOnlyMap {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 916053f42d..4067acee73 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -93,10 +93,8 @@ private[spark] class ExternalSorter[K, V, C](
partitioner: Option[Partitioner] = None,
ordering: Option[Ordering[K]] = None,
serializer: Serializer = SparkEnv.get.serializer)
- extends Logging
- with Spillable[WritablePartitionedPairCollection[K, C]] {
-
- override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager()
+ extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())
+ with Logging {
private val conf = SparkEnv.get.conf
@@ -126,8 +124,8 @@ private[spark] class ExternalSorter[K, V, C](
// Data structures to store in-memory objects before we spill. Depending on whether we have an
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
// store them in an array buffer.
- private var map = new PartitionedAppendOnlyMap[K, C]
- private var buffer = new PartitionedPairBuffer[K, C]
+ @volatile private var map = new PartitionedAppendOnlyMap[K, C]
+ @volatile private var buffer = new PartitionedPairBuffer[K, C]
// Total spilling statistics
private var _diskBytesSpilled = 0L
@@ -137,6 +135,10 @@ private[spark] class ExternalSorter[K, V, C](
private var _peakMemoryUsedBytes: Long = 0L
def peakMemoryUsedBytes: Long = _peakMemoryUsedBytes
+ @volatile private var isShuffleSort: Boolean = true
+ private val forceSpillFiles = new ArrayBuffer[SpilledFile]
+ @volatile private var readingIterator: SpillableIterator = null
+
// A comparator for keys K that orders them within a partition to allow aggregation or sorting.
// Can be a partial ordering by hash code if a total ordering is not provided through by the
// user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some
@@ -235,6 +237,34 @@ private[spark] class ExternalSorter[K, V, C](
* @param collection whichever collection we're using (map or buffer)
*/
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
+ val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
+ val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
+ spills.append(spillFile)
+ }
+
+ /**
+ * Force to spilling the current in-memory collection to disk to release memory,
+ * It will be called by TaskMemoryManager when there is not enough memory for the task.
+ */
+ override protected[this] def forceSpill(): Boolean = {
+ if (isShuffleSort) {
+ false
+ } else {
+ assert(readingIterator != null)
+ val isSpilled = readingIterator.spill()
+ if (isSpilled) {
+ map = null
+ buffer = null
+ }
+ isSpilled
+ }
+ }
+
+ /**
+ * Spill contents of in-memory iterator to a temporary file on disk.
+ */
+ private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
+ : SpilledFile = {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
@@ -271,12 +301,11 @@ private[spark] class ExternalSorter[K, V, C](
var success = false
try {
- val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
- while (it.hasNext) {
- val partitionId = it.nextPartition()
+ while (inMemoryIterator.hasNext) {
+ val partitionId = inMemoryIterator.nextPartition()
require(partitionId >= 0 && partitionId < numPartitions,
s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
- it.writeNext(writer)
+ inMemoryIterator.writeNext(writer)
elementsPerPartition(partitionId) += 1
objectsWritten += 1
@@ -308,7 +337,7 @@ private[spark] class ExternalSorter[K, V, C](
}
}
- spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition))
+ SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
}
/**
@@ -600,6 +629,20 @@ private[spark] class ExternalSorter[K, V, C](
}
/**
+ * Returns a destructive iterator for iterating over the entries of this map.
+ * If this iterator is forced spill to disk to release memory when there is not enough memory,
+ * it returns pairs from an on-disk map.
+ */
+ def destructiveIterator(memoryIterator: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = {
+ if (isShuffleSort) {
+ memoryIterator
+ } else {
+ readingIterator = new SpillableIterator(memoryIterator)
+ readingIterator
+ }
+ }
+
+ /**
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* contents, and these are expected to be accessed in order (you can't "skip ahead" to one
@@ -618,21 +661,26 @@ private[spark] class ExternalSorter[K, V, C](
// we don't even need to sort by anything other than partition ID
if (!ordering.isDefined) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
- groupByPartition(collection.partitionedDestructiveSortedIterator(None))
+ groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
// We do need to sort by both partition ID and key
- groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator)))
+ groupByPartition(destructiveIterator(
+ collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// Merge spilled and in-memory data
- merge(spills, collection.partitionedDestructiveSortedIterator(comparator))
+ merge(spills, destructiveIterator(
+ collection.partitionedDestructiveSortedIterator(comparator)))
}
}
/**
* Return an iterator over all the data written to this object, aggregated by our aggregator.
*/
- def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)
+ def iterator: Iterator[Product2[K, C]] = {
+ isShuffleSort = false
+ partitionedIterator.flatMap(pair => pair._2)
+ }
/**
* Write all the data added into this ExternalSorter into a file in the disk store. This is
@@ -689,11 +737,15 @@ private[spark] class ExternalSorter[K, V, C](
}
def stop(): Unit = {
- map = null // So that the memory can be garbage-collected
- buffer = null // So that the memory can be garbage-collected
spills.foreach(s => s.file.delete())
spills.clear()
- releaseMemory()
+ forceSpillFiles.foreach(s => s.file.delete())
+ forceSpillFiles.clear()
+ if (map != null || buffer != null) {
+ map = null // So that the memory can be garbage-collected
+ buffer = null // So that the memory can be garbage-collected
+ releaseMemory()
+ }
}
/**
@@ -727,4 +779,66 @@ private[spark] class ExternalSorter[K, V, C](
(elem._1._2, elem._2)
}
}
+
+ private[this] class SpillableIterator(var upstream: Iterator[((Int, K), C)])
+ extends Iterator[((Int, K), C)] {
+
+ private val SPILL_LOCK = new Object()
+
+ private var nextUpstream: Iterator[((Int, K), C)] = null
+
+ private var cur: ((Int, K), C) = readNext()
+
+ private var hasSpilled: Boolean = false
+
+ def spill(): Boolean = SPILL_LOCK.synchronized {
+ if (hasSpilled) {
+ false
+ } else {
+ val inMemoryIterator = new WritablePartitionedIterator {
+ private[this] var cur = if (upstream.hasNext) upstream.next() else null
+
+ def writeNext(writer: DiskBlockObjectWriter): Unit = {
+ writer.write(cur._1._2, cur._2)
+ cur = if (upstream.hasNext) upstream.next() else null
+ }
+
+ def hasNext(): Boolean = cur != null
+
+ def nextPartition(): Int = cur._1._1
+ }
+ logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
+ s" it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
+ val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
+ forceSpillFiles.append(spillFile)
+ val spillReader = new SpillReader(spillFile)
+ nextUpstream = (0 until numPartitions).iterator.flatMap { p =>
+ val iterator = spillReader.readNextPartition()
+ iterator.map(cur => ((p, cur._1), cur._2))
+ }
+ hasSpilled = true
+ true
+ }
+ }
+
+ def readNext(): ((Int, K), C) = SPILL_LOCK.synchronized {
+ if (nextUpstream != null) {
+ upstream = nextUpstream
+ nextUpstream = null
+ }
+ if (upstream.hasNext) {
+ upstream.next()
+ } else {
+ null
+ }
+ }
+
+ override def hasNext(): Boolean = cur != null
+
+ override def next(): ((Int, K), C) = {
+ val r = cur
+ cur = readNext()
+ r
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 25ca2037bb..aee6399eb0 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -19,13 +19,14 @@ package org.apache.spark.util.collection
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
-import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
+import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager}
/**
* Spills contents of an in-memory collection to disk when the memory threshold
* has been exceeded.
*/
-private[spark] trait Spillable[C] extends Logging {
+private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
+ extends MemoryConsumer(taskMemoryManager) with Logging {
/**
* Spills the current in-memory collection to disk, and releases the memory.
*
@@ -33,16 +34,19 @@ private[spark] trait Spillable[C] extends Logging {
*/
protected def spill(collection: C): Unit
+ /**
+ * Force to spilling the current in-memory collection to disk to release memory,
+ * It will be called by TaskMemoryManager when there is not enough memory for the task.
+ */
+ protected def forceSpill(): Boolean
+
// Number of elements read from input since last spill
- protected def elementsRead: Long = _elementsRead
+ @volatile protected def elementsRead: Long = _elementsRead
// Called by subclasses every time a record is read
// It's used for checking spilling frequency
protected def addElementsRead(): Unit = { _elementsRead += 1 }
- // Memory manager that can be used to acquire/release memory
- protected[this] def taskMemoryManager: TaskMemoryManager
-
// Initial threshold for the size of a collection before we start tracking its memory usage
// For testing only
private[this] val initialMemoryThreshold: Long =
@@ -55,13 +59,13 @@ private[spark] trait Spillable[C] extends Logging {
// Threshold for this collection's size in bytes before we start tracking its memory usage
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
- private[this] var myMemoryThreshold = initialMemoryThreshold
+ @volatile private[this] var myMemoryThreshold = initialMemoryThreshold
// Number of elements read from input since last spill
private[this] var _elementsRead = 0L
// Number of bytes spilled in total
- private[this] var _memoryBytesSpilled = 0L
+ @volatile private[this] var _memoryBytesSpilled = 0L
// Number of spills
private[this] var _spillCount = 0
@@ -79,8 +83,7 @@ private[spark] trait Spillable[C] extends Logging {
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
- val granted =
- taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
+ val granted = acquireOnHeapMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
@@ -100,6 +103,27 @@ private[spark] trait Spillable[C] extends Logging {
}
/**
+ * Spill some data to disk to release memory, which will be called by TaskMemoryManager
+ * when there is not enough memory for the task.
+ */
+ override def spill(size: Long, trigger: MemoryConsumer): Long = {
+ if (trigger != this && taskMemoryManager.getTungstenMemoryMode == MemoryMode.ON_HEAP) {
+ val isSpilled = forceSpill()
+ if (!isSpilled) {
+ 0L
+ } else {
+ _elementsRead = 0
+ val freeMemory = myMemoryThreshold - initialMemoryThreshold
+ _memoryBytesSpilled += freeMemory
+ releaseMemory()
+ freeMemory
+ }
+ } else {
+ 0L
+ }
+ }
+
+ /**
* @return number of bytes spilled in total
*/
def memoryBytesSpilled: Long = _memoryBytesSpilled
@@ -108,9 +132,7 @@ private[spark] trait Spillable[C] extends Logging {
* Release our memory back to the execution pool so that other tasks can grab it.
*/
def releaseMemory(): Unit = {
- // The amount we requested does not include the initial memory tracking threshold
- taskMemoryManager.releaseExecutionMemory(
- myMemoryThreshold - initialMemoryThreshold, MemoryMode.ON_HEAP, null)
+ freeOnHeapMemory(myMemoryThreshold - initialMemoryThreshold)
myMemoryThreshold = initialMemoryThreshold
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 2410118fb7..5141e36d9e 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -416,4 +416,19 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("force to spill for external aggregation") {
+ val conf = createSparkConf(loadDefaults = false)
+ .set("spark.shuffle.memoryFraction", "0.01")
+ .set("spark.memory.useLegacyMode", "true")
+ .set("spark.testing.memory", "100000000")
+ .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+ sc = new SparkContext("local", "test", conf)
+ val N = 2e5.toInt
+ sc.parallelize(1 to N, 2)
+ .map { i => (i, i) }
+ .groupByKey()
+ .reduceByKey(_ ++ _)
+ .count()
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index a1a7ac97d9..4dd8e31c27 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -608,4 +608,21 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
}
}
+
+ test("force to spill for external sorter") {
+ val conf = createSparkConf(loadDefaults = false, kryo = false)
+ .set("spark.shuffle.memoryFraction", "0.01")
+ .set("spark.memory.useLegacyMode", "true")
+ .set("spark.testing.memory", "100000000")
+ .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+ sc = new SparkContext("local", "test", conf)
+ val N = 2e5.toInt
+ val p = new org.apache.spark.HashPartitioner(2)
+ val p2 = new org.apache.spark.HashPartitioner(3)
+ sc.parallelize(1 to N, 3)
+ .map { x => (x % 100000) -> x.toLong }
+ .repartitionAndSortWithinPartitions(p)
+ .repartitionAndSortWithinPartitions(p2)
+ .count()
+ }
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 65b0a97e4d..3c9f1532f9 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -660,6 +660,9 @@ object MimaExcludes {
// SPARK-14704: Create accumulators in TaskMetrics
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.this")
+ ) ++ Seq(
+ // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")
)
case v if v.startsWith("1.6") =>
Seq(