aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-10-07 12:52:10 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-07 12:52:10 -0700
commit553737c6e6d5ffa3b52a9888444f4beece5c5b1a (patch)
treeef588b2556f7c7bdf0b519be6326e6086995c163 /core/src
parentbc87cc410fae59660c13b6ae1c14204df77237b8 (diff)
downloadspark-553737c6e6d5ffa3b52a9888444f4beece5c5b1a.tar.gz
spark-553737c6e6d5ffa3b52a9888444f4beece5c5b1a.tar.bz2
spark-553737c6e6d5ffa3b52a9888444f4beece5c5b1a.zip
[SPARK-3825] Log more detail when unrolling a block fails
Before: ``` 14/10/06 16:45:42 WARN CacheManager: Not enough space to cache partition rdd_0_2 in memory! Free memory is 481861527 bytes. ``` After: ``` 14/10/07 11:08:24 WARN MemoryStore: Not enough space to cache rdd_2_0 in memory! (computed 68.8 MB so far) 14/10/07 11:08:24 INFO MemoryStore: Memory use = 1088.0 B (blocks) + 445.1 MB (scratch space shared across 8 thread(s)) = 445.1 MB. Storage limit = 459.5 MB. ``` Author: Andrew Or <andrewor14@gmail.com> Closes #2688 from andrewor14/cache-log-message and squashes the following commits: 28e33d6 [Andrew Or] Shy away from "unrolling" 5638c49 [Andrew Or] Grammar 39a0c28 [Andrew Or] Log more detail when unrolling a block fails
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala45
2 files changed, 39 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index f8584b90ca..d89bb50076 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
- logWarning(s"Not enough space to cache partition $key in memory! " +
- s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 0a09c24d61..edbc729c17 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -132,8 +132,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
PutResult(res.size, res.data, droppedBlocks)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
- logWarning(s"Not enough space to store block $blockId in memory! " +
- s"Free memory is $freeMemory bytes.")
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
@@ -265,6 +263,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
Left(vector.toArray)
} else {
// We ran out of space while unrolling the values for this block
+ logUnrollFailureMessage(blockId, vector.estimateSize())
Right(vector.iterator ++ values)
}
@@ -424,7 +423,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Reserve additional memory for unrolling blocks used by this thread.
* Return whether the request is granted.
*/
- private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
+ def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
accountingLock.synchronized {
val granted = freeMemory > currentUnrollMemory + memory
if (granted) {
@@ -439,7 +438,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Release memory used by this thread for unrolling blocks.
* If the amount is not specified, remove the current thread's allocation altogether.
*/
- private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
+ def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
val threadId = Thread.currentThread().getId
accountingLock.synchronized {
if (memory < 0) {
@@ -457,16 +456,50 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
/**
* Return the amount of memory currently occupied for unrolling blocks across all threads.
*/
- private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
+ def currentUnrollMemory: Long = accountingLock.synchronized {
unrollMemoryMap.values.sum
}
/**
* Return the amount of memory currently occupied for unrolling blocks by this thread.
*/
- private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
+ def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
}
+
+ /**
+ * Return the number of threads currently unrolling blocks.
+ */
+ def numThreadsUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }
+
+ /**
+ * Log information about current memory usage.
+ */
+ def logMemoryUsage(): Unit = {
+ val blocksMemory = currentMemory
+ val unrollMemory = currentUnrollMemory
+ val totalMemory = blocksMemory + unrollMemory
+ logInfo(
+ s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " +
+ s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " +
+ s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. " +
+ s"Storage limit = ${Utils.bytesToString(maxMemory)}."
+ )
+ }
+
+ /**
+ * Log a warning for failing to unroll a block.
+ *
+ * @param blockId ID of the block we are trying to unroll.
+ * @param finalVectorSize Final size of the vector before unrolling failed.
+ */
+ def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = {
+ logWarning(
+ s"Not enough space to cache $blockId in memory! " +
+ s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
+ )
+ logMemoryUsage()
+ }
}
private[spark] case class ResultWithDroppedBlocks(