aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2016-04-21 10:02:23 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-21 10:02:23 -0700
commit4f369176b750c980682a6be468cefa8627769c72 (patch)
tree6afcbb5af6f541e660cf756dfaff192f4285d45d
parent649335d6c1dd0884bf6f6618cd56fb480fe40886 (diff)
downloadspark-4f369176b750c980682a6be468cefa8627769c72.tar.gz
spark-4f369176b750c980682a6be468cefa8627769c72.tar.bz2
spark-4f369176b750c980682a6be468cefa8627769c72.zip
[SPARK-4452] [CORE] Shuffle data structures can starve others on the same thread for memory
## What changes were proposed in this pull request? In #9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution. But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from #9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core. ## How was this patch tested? add two unit tests for it. Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #10024 from lianhuiwang/SPARK-4452-2.
-rw-r--r--core/src/main/java/org/apache/spark/memory/MemoryConsumer.java19
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java7
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala111
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala150
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/Spillable.scala48
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala17
-rw-r--r--project/MimaExcludes.scala3
8 files changed, 324 insertions, 46 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 36138cc9a2..840f13b394 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -45,7 +45,7 @@ public abstract class MemoryConsumer {
/**
* Returns the size of used memory in bytes.
*/
- long getUsed() {
+ protected long getUsed() {
return used;
}
@@ -130,4 +130,21 @@ public abstract class MemoryConsumer {
used -= page.size();
taskMemoryManager.freePage(page, this);
}
+
+ /**
+ * Allocates a heap memory of `size`.
+ */
+ public long acquireOnHeapMemory(long size) {
+ long granted = taskMemoryManager.acquireExecutionMemory(size, MemoryMode.ON_HEAP, this);
+ used += granted;
+ return granted;
+ }
+
+ /**
+ * Release N bytes of heap memory.
+ */
+ public void freeOnHeapMemory(long size) {
+ taskMemoryManager.releaseExecutionMemory(size, MemoryMode.ON_HEAP, this);
+ used -= size;
+ }
}
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 9044bb4f4a..6b7d9aa20f 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -408,4 +408,11 @@ public class TaskMemoryManager {
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
}
+
+ /**
+ * Returns Tungsten memory mode
+ */
+ public MemoryMode getTungstenMemoryMode(){
+ return tungstenMemoryMode;
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 95351e9826..fc71f8365c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -61,10 +61,10 @@ class ExternalAppendOnlyMap[K, V, C](
blockManager: BlockManager = SparkEnv.get.blockManager,
context: TaskContext = TaskContext.get(),
serializerManager: SerializerManager = SparkEnv.get.serializerManager)
- extends Iterable[(K, C)]
+ extends Spillable[SizeTracker](context.taskMemoryManager())
with Serializable
with Logging
- with Spillable[SizeTracker] {
+ with Iterable[(K, C)] {
if (context == null) {
throw new IllegalStateException(
@@ -81,9 +81,7 @@ class ExternalAppendOnlyMap[K, V, C](
this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get())
}
- override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager()
-
- private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
+ @volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
private val diskBlockManager = blockManager.diskBlockManager
@@ -117,6 +115,8 @@ class ExternalAppendOnlyMap[K, V, C](
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()
+ @volatile private var readingIterator: SpillableIterator = null
+
/**
* Number of files this map has spilled so far.
* Exposed for testing.
@@ -182,6 +182,29 @@ class ExternalAppendOnlyMap[K, V, C](
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk.
*/
override protected[this] def spill(collection: SizeTracker): Unit = {
+ val inMemoryIterator = currentMap.destructiveSortedIterator(keyComparator)
+ val diskMapIterator = spillMemoryIteratorToDisk(inMemoryIterator)
+ spilledMaps.append(diskMapIterator)
+ }
+
+ /**
+ * Force to spilling the current in-memory collection to disk to release memory,
+ * It will be called by TaskMemoryManager when there is not enough memory for the task.
+ */
+ override protected[this] def forceSpill(): Boolean = {
+ assert(readingIterator != null)
+ val isSpilled = readingIterator.spill()
+ if (isSpilled) {
+ currentMap = null
+ }
+ isSpilled
+ }
+
+ /**
+ * Spill the in-memory Iterator to a temporary file on disk.
+ */
+ private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)])
+ : DiskMapIterator = {
val (blockId, file) = diskBlockManager.createTempLocalBlock()
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
@@ -202,9 +225,8 @@ class ExternalAppendOnlyMap[K, V, C](
var success = false
try {
- val it = currentMap.destructiveSortedIterator(keyComparator)
- while (it.hasNext) {
- val kv = it.next()
+ while (inMemoryIterator.hasNext) {
+ val kv = inMemoryIterator.next()
writer.write(kv._1, kv._2)
objectsWritten += 1
@@ -237,7 +259,17 @@ class ExternalAppendOnlyMap[K, V, C](
}
}
- spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
+ new DiskMapIterator(file, blockId, batchSizes)
+ }
+
+ /**
+ * Returns a destructive iterator for iterating over the entries of this map.
+ * If this iterator is forced spill to disk to release memory when there is not enough memory,
+ * it returns pairs from an on-disk map.
+ */
+ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {
+ readingIterator = new SpillableIterator(inMemoryIterator)
+ readingIterator
}
/**
@@ -250,15 +282,18 @@ class ExternalAppendOnlyMap[K, V, C](
"ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
}
if (spilledMaps.isEmpty) {
- CompletionIterator[(K, C), Iterator[(K, C)]](currentMap.iterator, freeCurrentMap())
+ CompletionIterator[(K, C), Iterator[(K, C)]](
+ destructiveIterator(currentMap.iterator), freeCurrentMap())
} else {
new ExternalIterator()
}
}
private def freeCurrentMap(): Unit = {
- currentMap = null // So that the memory can be garbage-collected
- releaseMemory()
+ if (currentMap != null) {
+ currentMap = null // So that the memory can be garbage-collected
+ releaseMemory()
+ }
}
/**
@@ -272,8 +307,8 @@ class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
- private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](
- currentMap.destructiveSortedIterator(keyComparator), freeCurrentMap())
+ private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
+ currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
inputStreams.foreach { it =>
@@ -532,8 +567,56 @@ class ExternalAppendOnlyMap[K, V, C](
context.addTaskCompletionListener(context => cleanup())
}
+ private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
+ extends Iterator[(K, C)] {
+
+ private val SPILL_LOCK = new Object()
+
+ private var nextUpstream: Iterator[(K, C)] = null
+
+ private var cur: (K, C) = readNext()
+
+ private var hasSpilled: Boolean = false
+
+ def spill(): Boolean = SPILL_LOCK.synchronized {
+ if (hasSpilled) {
+ false
+ } else {
+ logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
+ s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
+ nextUpstream = spillMemoryIteratorToDisk(upstream)
+ hasSpilled = true
+ true
+ }
+ }
+
+ def readNext(): (K, C) = SPILL_LOCK.synchronized {
+ if (nextUpstream != null) {
+ upstream = nextUpstream
+ nextUpstream = null
+ }
+ if (upstream.hasNext) {
+ upstream.next()
+ } else {
+ null
+ }
+ }
+
+ override def hasNext(): Boolean = cur != null
+
+ override def next(): (K, C) = {
+ val r = cur
+ cur = readNext()
+ r
+ }
+ }
+
/** Convenience function to hash the given (K, C) pair by the key. */
private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)
+
+ override def toString(): String = {
+ this.getClass.getName + "@" + java.lang.Integer.toHexString(this.hashCode())
+ }
}
private[spark] object ExternalAppendOnlyMap {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 916053f42d..4067acee73 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -93,10 +93,8 @@ private[spark] class ExternalSorter[K, V, C](
partitioner: Option[Partitioner] = None,
ordering: Option[Ordering[K]] = None,
serializer: Serializer = SparkEnv.get.serializer)
- extends Logging
- with Spillable[WritablePartitionedPairCollection[K, C]] {
-
- override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager()
+ extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())
+ with Logging {
private val conf = SparkEnv.get.conf
@@ -126,8 +124,8 @@ private[spark] class ExternalSorter[K, V, C](
// Data structures to store in-memory objects before we spill. Depending on whether we have an
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
// store them in an array buffer.
- private var map = new PartitionedAppendOnlyMap[K, C]
- private var buffer = new PartitionedPairBuffer[K, C]
+ @volatile private var map = new PartitionedAppendOnlyMap[K, C]
+ @volatile private var buffer = new PartitionedPairBuffer[K, C]
// Total spilling statistics
private var _diskBytesSpilled = 0L
@@ -137,6 +135,10 @@ private[spark] class ExternalSorter[K, V, C](
private var _peakMemoryUsedBytes: Long = 0L
def peakMemoryUsedBytes: Long = _peakMemoryUsedBytes
+ @volatile private var isShuffleSort: Boolean = true
+ private val forceSpillFiles = new ArrayBuffer[SpilledFile]
+ @volatile private var readingIterator: SpillableIterator = null
+
// A comparator for keys K that orders them within a partition to allow aggregation or sorting.
// Can be a partial ordering by hash code if a total ordering is not provided through by the
// user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some
@@ -235,6 +237,34 @@ private[spark] class ExternalSorter[K, V, C](
* @param collection whichever collection we're using (map or buffer)
*/
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
+ val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
+ val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
+ spills.append(spillFile)
+ }
+
+ /**
+ * Force to spilling the current in-memory collection to disk to release memory,
+ * It will be called by TaskMemoryManager when there is not enough memory for the task.
+ */
+ override protected[this] def forceSpill(): Boolean = {
+ if (isShuffleSort) {
+ false
+ } else {
+ assert(readingIterator != null)
+ val isSpilled = readingIterator.spill()
+ if (isSpilled) {
+ map = null
+ buffer = null
+ }
+ isSpilled
+ }
+ }
+
+ /**
+ * Spill contents of in-memory iterator to a temporary file on disk.
+ */
+ private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
+ : SpilledFile = {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
@@ -271,12 +301,11 @@ private[spark] class ExternalSorter[K, V, C](
var success = false
try {
- val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
- while (it.hasNext) {
- val partitionId = it.nextPartition()
+ while (inMemoryIterator.hasNext) {
+ val partitionId = inMemoryIterator.nextPartition()
require(partitionId >= 0 && partitionId < numPartitions,
s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
- it.writeNext(writer)
+ inMemoryIterator.writeNext(writer)
elementsPerPartition(partitionId) += 1
objectsWritten += 1
@@ -308,7 +337,7 @@ private[spark] class ExternalSorter[K, V, C](
}
}
- spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition))
+ SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
}
/**
@@ -600,6 +629,20 @@ private[spark] class ExternalSorter[K, V, C](
}
/**
+ * Returns a destructive iterator for iterating over the entries of this map.
+ * If this iterator is forced spill to disk to release memory when there is not enough memory,
+ * it returns pairs from an on-disk map.
+ */
+ def destructiveIterator(memoryIterator: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = {
+ if (isShuffleSort) {
+ memoryIterator
+ } else {
+ readingIterator = new SpillableIterator(memoryIterator)
+ readingIterator
+ }
+ }
+
+ /**
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* contents, and these are expected to be accessed in order (you can't "skip ahead" to one
@@ -618,21 +661,26 @@ private[spark] class ExternalSorter[K, V, C](
// we don't even need to sort by anything other than partition ID
if (!ordering.isDefined) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
- groupByPartition(collection.partitionedDestructiveSortedIterator(None))
+ groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
// We do need to sort by both partition ID and key
- groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator)))
+ groupByPartition(destructiveIterator(
+ collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// Merge spilled and in-memory data
- merge(spills, collection.partitionedDestructiveSortedIterator(comparator))
+ merge(spills, destructiveIterator(
+ collection.partitionedDestructiveSortedIterator(comparator)))
}
}
/**
* Return an iterator over all the data written to this object, aggregated by our aggregator.
*/
- def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)
+ def iterator: Iterator[Product2[K, C]] = {
+ isShuffleSort = false
+ partitionedIterator.flatMap(pair => pair._2)
+ }
/**
* Write all the data added into this ExternalSorter into a file in the disk store. This is
@@ -689,11 +737,15 @@ private[spark] class ExternalSorter[K, V, C](
}
def stop(): Unit = {
- map = null // So that the memory can be garbage-collected
- buffer = null // So that the memory can be garbage-collected
spills.foreach(s => s.file.delete())
spills.clear()
- releaseMemory()
+ forceSpillFiles.foreach(s => s.file.delete())
+ forceSpillFiles.clear()
+ if (map != null || buffer != null) {
+ map = null // So that the memory can be garbage-collected
+ buffer = null // So that the memory can be garbage-collected
+ releaseMemory()
+ }
}
/**
@@ -727,4 +779,66 @@ private[spark] class ExternalSorter[K, V, C](
(elem._1._2, elem._2)
}
}
+
+ private[this] class SpillableIterator(var upstream: Iterator[((Int, K), C)])
+ extends Iterator[((Int, K), C)] {
+
+ private val SPILL_LOCK = new Object()
+
+ private var nextUpstream: Iterator[((Int, K), C)] = null
+
+ private var cur: ((Int, K), C) = readNext()
+
+ private var hasSpilled: Boolean = false
+
+ def spill(): Boolean = SPILL_LOCK.synchronized {
+ if (hasSpilled) {
+ false
+ } else {
+ val inMemoryIterator = new WritablePartitionedIterator {
+ private[this] var cur = if (upstream.hasNext) upstream.next() else null
+
+ def writeNext(writer: DiskBlockObjectWriter): Unit = {
+ writer.write(cur._1._2, cur._2)
+ cur = if (upstream.hasNext) upstream.next() else null
+ }
+
+ def hasNext(): Boolean = cur != null
+
+ def nextPartition(): Int = cur._1._1
+ }
+ logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
+ s" it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
+ val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
+ forceSpillFiles.append(spillFile)
+ val spillReader = new SpillReader(spillFile)
+ nextUpstream = (0 until numPartitions).iterator.flatMap { p =>
+ val iterator = spillReader.readNextPartition()
+ iterator.map(cur => ((p, cur._1), cur._2))
+ }
+ hasSpilled = true
+ true
+ }
+ }
+
+ def readNext(): ((Int, K), C) = SPILL_LOCK.synchronized {
+ if (nextUpstream != null) {
+ upstream = nextUpstream
+ nextUpstream = null
+ }
+ if (upstream.hasNext) {
+ upstream.next()
+ } else {
+ null
+ }
+ }
+
+ override def hasNext(): Boolean = cur != null
+
+ override def next(): ((Int, K), C) = {
+ val r = cur
+ cur = readNext()
+ r
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 25ca2037bb..aee6399eb0 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -19,13 +19,14 @@ package org.apache.spark.util.collection
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
-import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
+import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager}
/**
* Spills contents of an in-memory collection to disk when the memory threshold
* has been exceeded.
*/
-private[spark] trait Spillable[C] extends Logging {
+private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
+ extends MemoryConsumer(taskMemoryManager) with Logging {
/**
* Spills the current in-memory collection to disk, and releases the memory.
*
@@ -33,16 +34,19 @@ private[spark] trait Spillable[C] extends Logging {
*/
protected def spill(collection: C): Unit
+ /**
+ * Force to spilling the current in-memory collection to disk to release memory,
+ * It will be called by TaskMemoryManager when there is not enough memory for the task.
+ */
+ protected def forceSpill(): Boolean
+
// Number of elements read from input since last spill
- protected def elementsRead: Long = _elementsRead
+ @volatile protected def elementsRead: Long = _elementsRead
// Called by subclasses every time a record is read
// It's used for checking spilling frequency
protected def addElementsRead(): Unit = { _elementsRead += 1 }
- // Memory manager that can be used to acquire/release memory
- protected[this] def taskMemoryManager: TaskMemoryManager
-
// Initial threshold for the size of a collection before we start tracking its memory usage
// For testing only
private[this] val initialMemoryThreshold: Long =
@@ -55,13 +59,13 @@ private[spark] trait Spillable[C] extends Logging {
// Threshold for this collection's size in bytes before we start tracking its memory usage
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
- private[this] var myMemoryThreshold = initialMemoryThreshold
+ @volatile private[this] var myMemoryThreshold = initialMemoryThreshold
// Number of elements read from input since last spill
private[this] var _elementsRead = 0L
// Number of bytes spilled in total
- private[this] var _memoryBytesSpilled = 0L
+ @volatile private[this] var _memoryBytesSpilled = 0L
// Number of spills
private[this] var _spillCount = 0
@@ -79,8 +83,7 @@ private[spark] trait Spillable[C] extends Logging {
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
- val granted =
- taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
+ val granted = acquireOnHeapMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
@@ -100,6 +103,27 @@ private[spark] trait Spillable[C] extends Logging {
}
/**
+ * Spill some data to disk to release memory, which will be called by TaskMemoryManager
+ * when there is not enough memory for the task.
+ */
+ override def spill(size: Long, trigger: MemoryConsumer): Long = {
+ if (trigger != this && taskMemoryManager.getTungstenMemoryMode == MemoryMode.ON_HEAP) {
+ val isSpilled = forceSpill()
+ if (!isSpilled) {
+ 0L
+ } else {
+ _elementsRead = 0
+ val freeMemory = myMemoryThreshold - initialMemoryThreshold
+ _memoryBytesSpilled += freeMemory
+ releaseMemory()
+ freeMemory
+ }
+ } else {
+ 0L
+ }
+ }
+
+ /**
* @return number of bytes spilled in total
*/
def memoryBytesSpilled: Long = _memoryBytesSpilled
@@ -108,9 +132,7 @@ private[spark] trait Spillable[C] extends Logging {
* Release our memory back to the execution pool so that other tasks can grab it.
*/
def releaseMemory(): Unit = {
- // The amount we requested does not include the initial memory tracking threshold
- taskMemoryManager.releaseExecutionMemory(
- myMemoryThreshold - initialMemoryThreshold, MemoryMode.ON_HEAP, null)
+ freeOnHeapMemory(myMemoryThreshold - initialMemoryThreshold)
myMemoryThreshold = initialMemoryThreshold
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 2410118fb7..5141e36d9e 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -416,4 +416,19 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("force to spill for external aggregation") {
+ val conf = createSparkConf(loadDefaults = false)
+ .set("spark.shuffle.memoryFraction", "0.01")
+ .set("spark.memory.useLegacyMode", "true")
+ .set("spark.testing.memory", "100000000")
+ .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+ sc = new SparkContext("local", "test", conf)
+ val N = 2e5.toInt
+ sc.parallelize(1 to N, 2)
+ .map { i => (i, i) }
+ .groupByKey()
+ .reduceByKey(_ ++ _)
+ .count()
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index a1a7ac97d9..4dd8e31c27 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -608,4 +608,21 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
}
}
+
+ test("force to spill for external sorter") {
+ val conf = createSparkConf(loadDefaults = false, kryo = false)
+ .set("spark.shuffle.memoryFraction", "0.01")
+ .set("spark.memory.useLegacyMode", "true")
+ .set("spark.testing.memory", "100000000")
+ .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+ sc = new SparkContext("local", "test", conf)
+ val N = 2e5.toInt
+ val p = new org.apache.spark.HashPartitioner(2)
+ val p2 = new org.apache.spark.HashPartitioner(3)
+ sc.parallelize(1 to N, 3)
+ .map { x => (x % 100000) -> x.toLong }
+ .repartitionAndSortWithinPartitions(p)
+ .repartitionAndSortWithinPartitions(p2)
+ .count()
+ }
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 65b0a97e4d..3c9f1532f9 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -660,6 +660,9 @@ object MimaExcludes {
// SPARK-14704: Create accumulators in TaskMetrics
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.this")
+ ) ++ Seq(
+ // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")
)
case v if v.startsWith("1.6") =>
Seq(