aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 71ab2a3e3b..be8f6529f7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -106,6 +106,7 @@ class ExternalAppendOnlyMap[K, V, C](
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()
+ private val threadId = Thread.currentThread().getId
/**
* Insert the given key and value into the map.
@@ -128,7 +129,6 @@ class ExternalAppendOnlyMap[K, V, C](
// Atomically check whether there is sufficient memory in the global pool for
// this map to grow and, if possible, allocate the required amount
shuffleMemoryMap.synchronized {
- val threadId = Thread.currentThread().getId
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
val availableMemory = maxMemoryThreshold -
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
@@ -153,8 +153,8 @@ class ExternalAppendOnlyMap[K, V, C](
*/
private def spill(mapSize: Long) {
spillCount += 1
- logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
- .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
+ logWarning("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)"
+ .format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
var objectsWritten = 0