aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-01-18 13:34:12 -0800
committerAndrew Or <andrew@databricks.com>2016-01-18 13:34:12 -0800
commitb8cb548a4394221f2b029c84c7f130774da69e3a (patch)
treea9c08ccd7dc2fe0e9a060a5ea3d617012760b3d7 /core
parent302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82 (diff)
downloadspark-b8cb548a4394221f2b029c84c7f130774da69e3a.tar.gz
spark-b8cb548a4394221f2b029c84c7f130774da69e3a.tar.bz2
spark-b8cb548a4394221f2b029c84c7f130774da69e3a.zip
[SPARK-10985][CORE] Avoid passing evicted blocks throughout BlockManager
This patch refactors portions of the BlockManager and CacheManager in order to avoid having to pass `evictedBlocks` lists throughout the code. It appears that these lists were only consumed by `TaskContext.taskMetrics`, so the new code now directly updates the metrics from the lower-level BlockManager methods. Author: Josh Rosen <joshrosen@databricks.com> Closes #10776 from JoshRosen/SPARK-10985.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala77
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala55
13 files changed, 166 insertions, 237 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 36b536e89c..d92d8b0eef 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -18,7 +18,6 @@
package org.apache.spark
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -68,12 +67,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
- // Otherwise, cache the values and keep track of any updates in block statuses
- val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
- val metrics = context.taskMetrics
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
+ // Otherwise, cache the values
+ val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)
} finally {
@@ -135,7 +130,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
key: BlockId,
values: Iterator[T],
level: StorageLevel,
- updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
val putLevel = effectiveStorageLevel.getOrElse(level)
@@ -144,8 +138,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
- updatedBlocks ++=
- blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
+ blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
@@ -163,11 +156,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
- blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
+ blockManager.memoryStore.unrollSafely(key, values) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
- updatedBlocks ++=
- blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
+ blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
@@ -176,7 +168,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
- putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
+ putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel))
} else {
returnValues
}
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 33f8b9f16c..b5adbd88a2 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -19,10 +19,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy
-import scala.collection.mutable
-
import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
+import org.apache.spark.storage.{BlockId, MemoryStore}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.MemoryAllocator
@@ -67,17 +65,11 @@ private[spark] abstract class MemoryManager(
storageMemoryPool.setMemoryStore(store)
}
- // TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985)
-
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return whether all N bytes were successfully granted.
*/
- def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+ def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
@@ -85,14 +77,10 @@ private[spark] abstract class MemoryManager(
* This extra method allows subclasses to differentiate behavior between acquiring storage
* memory and acquiring unroll memory. For instance, the memory management model in Spark
* 1.5 and before places a limit on the amount of space that can be freed from unrolling.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
*
* @return whether all N bytes were successfully granted.
*/
- def acquireUnrollMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+ def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean
/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 3554b558f2..f9f8f820bc 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -17,10 +17,8 @@
package org.apache.spark.memory
-import scala.collection.mutable
-
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus}
+import org.apache.spark.storage.BlockId
/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
@@ -53,24 +51,18 @@ private[spark] class StaticMemoryManager(
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
- override def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
false
} else {
- storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+ storageMemoryPool.acquireMemory(blockId, numBytes)
}
}
- override def acquireUnrollMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = storageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
@@ -80,7 +72,7 @@ private[spark] class StaticMemoryManager(
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
- storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
+ storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}
private[memory]
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 4036484aad..6a88966f60 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -19,11 +19,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
+import org.apache.spark.Logging
+import org.apache.spark.storage.{BlockId, MemoryStore}
/**
* Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
@@ -58,15 +55,11 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return whether all N bytes were successfully granted.
*/
- def acquireMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
+ def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
val numBytesToFree = math.max(0, numBytes - memoryFree)
- acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
+ acquireMemory(blockId, numBytes, numBytesToFree)
}
/**
@@ -80,19 +73,12 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
- numBytesToFree: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
+ numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) {
- memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
- // Register evicted blocks, if any, with the active task metrics
- Option(TaskContext.get()).foreach { tc =>
- val metrics = tc.taskMetrics()
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
- }
+ memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
@@ -129,9 +115,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
- val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks)
- val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
+ val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 57a24ac140..a3321e3f17 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -17,10 +17,8 @@
package org.apache.spark.memory
-import scala.collection.mutable
-
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus}
+import org.apache.spark.storage.BlockId
/**
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
@@ -133,10 +131,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
}
}
- override def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
@@ -152,14 +147,11 @@ private[spark] class UnifiedMemoryManager private[memory] (
onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
- storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+ storageMemoryPool.acquireMemory(blockId, numBytes)
}
- override def acquireUnrollMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- acquireStorageMemory(blockId, numBytes, evictedBlocks)
+ override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
+ acquireStorageMemory(blockId, numBytes)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e49d79b8ad..e0a8e88df2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -612,12 +612,16 @@ private[spark] class BlockManager(
None
}
+ /**
+ * @return true if the block was stored or false if the block was already stored or an
+ * error occurred.
+ */
def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
+ effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(values != null, "Values is null")
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}
@@ -641,28 +645,32 @@ private[spark] class BlockManager(
/**
* Put a new block of values to the block manager.
- * Return a list of blocks updated as a result of this put.
+ *
+ * @return true if the block was stored or false if the block was already stored or an
+ * error occurred.
*/
def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
+ effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(values != null, "Values is null")
doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
}
/**
* Put a new block of serialized bytes to the block manager.
- * Return a list of blocks updated as a result of this put.
+ *
+ * @return true if the block was stored or false if the block was already stored or an
+ * error occurred.
*/
def putBytes(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
+ effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(bytes != null, "Bytes is null")
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}
@@ -674,14 +682,16 @@ private[spark] class BlockManager(
* The effective storage level refers to the level according to which the block will actually be
* handled. This allows the caller to specify an alternate behavior of doPut while preserving
* the original level specified by the user.
+ *
+ * @return true if the block was stored or false if the block was already stored or an
+ * error occurred.
*/
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None)
- : Seq[(BlockId, BlockStatus)] = {
+ effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -689,9 +699,6 @@ private[spark] class BlockManager(
require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
}
- // Return value
- val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-
/* Remember the block's storage level so that we can correctly drop it to disk if it needs
* to be dropped right after it got put into memory. Note, however, that other threads will
* not be able to get() this block until we call markReady on its BlockInfo. */
@@ -702,7 +709,7 @@ private[spark] class BlockManager(
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
- return updatedBlocks
+ return false
}
// TODO: So the block info exists - but previous attempt to load it (?) failed.
// What do we do now ? Retry on it ?
@@ -743,11 +750,12 @@ private[spark] class BlockManager(
case _ => null
}
+ var marked = false
+
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- var marked = false
try {
// returnValues - Whether to return the values put
// blockStore - The type of storage to put these values into
@@ -783,11 +791,6 @@ private[spark] class BlockManager(
case _ =>
}
- // Keep track of which blocks are dropped from memory
- if (putLevel.useMemory) {
- result.droppedBlocks.foreach { updatedBlocks += _ }
- }
-
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
@@ -797,7 +800,11 @@ private[spark] class BlockManager(
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
- updatedBlocks += ((blockId, putBlockStatus))
+ Option(TaskContext.get()).foreach { taskContext =>
+ val metrics = taskContext.taskMetrics()
+ val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, putBlockStatus)))
+ }
}
} finally {
// If we failed in putting the block to memory/disk, notify other possible readers
@@ -847,7 +854,7 @@ private[spark] class BlockManager(
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
- updatedBlocks
+ marked
}
/**
@@ -967,32 +974,27 @@ private[spark] class BlockManager(
/**
* Write a block consisting of a single object.
+ *
+ * @return true if the block was stored or false if the block was already stored or an
+ * error occurred.
*/
def putSingle(
blockId: BlockId,
value: Any,
level: StorageLevel,
- tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+ tellMaster: Boolean = true): Boolean = {
putIterator(blockId, Iterator(value), level, tellMaster)
}
- def dropFromMemory(
- blockId: BlockId,
- data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
- dropFromMemory(blockId, () => data)
- }
-
/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
* If `data` is not put on disk, it won't be created.
- *
- * Return the block status if the given block has been updated, else None.
*/
def dropFromMemory(
blockId: BlockId,
- data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+ data: () => Either[Array[Any], ByteBuffer]): Unit = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId)
@@ -1005,10 +1007,10 @@ private[spark] class BlockManager(
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
- return None
+ return
} else if (blockInfo.asScala.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
- return None
+ return
}
var blockIsUpdated = false
val level = info.level
@@ -1044,11 +1046,14 @@ private[spark] class BlockManager(
blockInfo.remove(blockId)
}
if (blockIsUpdated) {
- return Some(status)
+ Option(TaskContext.get()).foreach { taskContext =>
+ val metrics = taskContext.taskMetrics()
+ val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
+ }
}
}
}
- None
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index bdab8c2332..76aaa782b9 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -95,9 +95,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val values = blockManager.dataDeserialize(blockId, bytes)
putIterator(blockId, values, level, returnValues = true)
} else {
- val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
- PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
+ tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
@@ -110,8 +109,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
- val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false, droppedBlocks)
+ val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
val data =
if (putSuccess) {
assert(bytes.limit == size)
@@ -119,7 +117,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
} else {
null
}
- PutResult(size, data, droppedBlocks)
+ PutResult(size, data)
}
override def putArray(
@@ -127,15 +125,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
- tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks)
- PutResult(sizeEstimate, Left(values.iterator), droppedBlocks)
+ tryToPut(blockId, values, sizeEstimate, deserialized = true)
+ PutResult(sizeEstimate, Left(values.iterator))
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
- tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
- PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
+ tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
@@ -165,22 +162,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
level: StorageLevel,
returnValues: Boolean,
allowPersistToDisk: Boolean): PutResult = {
- val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
+ val unrolledValues = unrollSafely(blockId, values)
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
val res = putArray(blockId, arrayValues, level, returnValues)
- droppedBlocks ++= res.droppedBlocks
- PutResult(res.size, res.data, droppedBlocks)
+ PutResult(res.size, res.data)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
- PutResult(res.size, res.data, droppedBlocks)
+ PutResult(res.size, res.data)
} else {
- PutResult(0, Left(iteratorValues), droppedBlocks)
+ PutResult(0, Left(iteratorValues))
}
}
}
@@ -246,11 +241,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* This method returns either an array with the contents of the entire block or an iterator
* containing the values of the block (if the array would have exceeded available memory).
*/
- def unrollSafely(
- blockId: BlockId,
- values: Iterator[Any],
- droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
- : Either[Array[Any], Iterator[Any]] = {
+ def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = {
// Number of elements unrolled so far
var elementsUnrolled = 0
@@ -270,7 +261,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
var vector = new SizeTrackingVector[Any]
// Request enough memory to begin unrolling
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, droppedBlocks)
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
@@ -286,8 +277,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
- keepUnrolling = reserveUnrollMemoryForThisTask(
- blockId, amountToRequest, droppedBlocks)
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
@@ -337,9 +327,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId: BlockId,
value: Any,
size: Long,
- deserialized: Boolean,
- droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- tryToPut(blockId, () => value, size, deserialized, droppedBlocks)
+ deserialized: Boolean): Boolean = {
+ tryToPut(blockId, () => value, size, deserialized)
}
/**
@@ -355,16 +344,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
*
- * All blocks evicted in the process, if any, will be added to `droppedBlocks`.
- *
* @return whether put was successful.
*/
private def tryToPut(
blockId: BlockId,
value: () => Any,
size: Long,
- deserialized: Boolean,
- droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ deserialized: Boolean): Boolean = {
/* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
* to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
@@ -380,7 +366,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
// happen atomically. This relies on the assumption that all memory acquisitions are
// synchronized on the same lock.
releasePendingUnrollMemoryForThisTask()
- val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
+ val enoughMemory = memoryManager.acquireStorageMemory(blockId, size)
if (enoughMemory) {
// We acquired enough memory for the block, so go ahead and put it
val entry = new MemoryEntry(value(), size, deserialized)
@@ -398,8 +384,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
} else {
Right(value().asInstanceOf[ByteBuffer].duplicate())
}
- val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
- droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
+ blockManager.dropFromMemory(blockId, () => data)
}
enoughMemory
}
@@ -413,13 +398,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
*
* @param blockId the ID of the block we are freeing space for, if any
* @param space the size of this block
- * @param droppedBlocks a holder for blocks evicted in the process
- * @return whether the requested free space is freed.
+ * @return the amount of memory (in bytes) freed by eviction
*/
- private[spark] def evictBlocksToFreeSpace(
- blockId: Option[BlockId],
- space: Long,
- droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
@@ -453,17 +434,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
- val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
- droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
+ blockManager.dropFromMemory(blockId, () => data)
}
}
- true
+ freedMemory
} else {
blockId.foreach { id =>
logInfo(s"Will not store $id as it would require dropping another block " +
"from the same RDD")
}
- false
+ 0L
}
}
}
@@ -481,12 +461,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* Reserve memory for unrolling the given block for this task.
* @return whether the request is granted.
*/
- def reserveUnrollMemoryForThisTask(
- blockId: BlockId,
- memory: Long,
- droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
memoryManager.synchronized {
- val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks)
+ val success = memoryManager.acquireUnrollMemory(blockId, memory)
if (success) {
val taskAttemptId = currentTaskAttemptId()
unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 30aa94c8a5..3865c201bf 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -85,7 +85,12 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
test("verify task metrics updated correctly") {
cacheManager = sc.env.cacheManager
val context = TaskContext.empty()
- cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
- assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
+ try {
+ TaskContext.setTaskContext(context)
+ cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
+ assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
+ } finally {
+ TaskContext.unset()
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 3b2368798c..d9764c7c10 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -70,8 +70,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
*/
protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
- when(ms.evictBlocksToFreeSpace(any(), anyLong(), any()))
- .thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
+ when(ms.evictBlocksToFreeSpace(any(), anyLong())).thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
mm.setMemoryStore(ms)
ms
}
@@ -89,9 +88,9 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
* records the number of bytes this is called with. This variable is expected to be cleared
* by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
*/
- private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = {
- new Answer[Boolean] {
- override def answer(invocation: InvocationOnMock): Boolean = {
+ private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Long] = {
+ new Answer[Long] {
+ override def answer(invocation: InvocationOnMock): Long = {
val args = invocation.getArguments
val numBytesToFree = args(1).asInstanceOf[Long]
assert(numBytesToFree > 0)
@@ -101,20 +100,12 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
if (numBytesToFree <= mm.storageMemoryUsed) {
// We can evict enough blocks to fulfill the request for space
mm.releaseStorageMemory(numBytesToFree)
- args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
+ evictedBlocks.append(
(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
- // We need to add this call so that that the suite-level `evictedBlocks` is updated when
- // execution evicts storage; in that case, args.last will not be equal to evictedBlocks
- // because it will be a temporary buffer created inside of the MemoryManager rather than
- // being passed in by the test code.
- if (!(evictedBlocks eq args.last)) {
- evictedBlocks.append(
- (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
- }
- true
+ numBytesToFree
} else {
// No blocks were evicted because eviction would not free enough space.
- false
+ 0L
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 68cf26fc3e..eee78d396e 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -81,22 +81,22 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val dummyBlock = TestBlockId("you can see the world you brought to live")
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 10L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 10L)
- assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 100L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire more than the max, not granted
- assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks))
+ assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire up to the max, requests after this are still granted due to LRU eviction
- assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem))
assertEvictBlocksToFreeSpaceCalled(ms, 110L)
assert(mm.storageMemoryUsed === 1000L)
- assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L))
assertEvictBlocksToFreeSpaceCalled(ms, 1L)
assert(evictedBlocks.nonEmpty)
evictedBlocks.clear()
@@ -107,12 +107,12 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
mm.releaseStorageMemory(800L)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 201L)
mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
@@ -134,7 +134,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
- assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 50L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 200L)
@@ -152,21 +152,21 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("lonely water")
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
- assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
+ assert(mm.acquireUnrollMemory(dummyBlock, 100L))
when(ms.currentUnrollMemory).thenReturn(100L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 100L)
mm.releaseUnrollMemory(40L)
assert(mm.storageMemoryUsed === 60L)
when(ms.currentUnrollMemory).thenReturn(60L)
- assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 800L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 860L)
// `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
// As of this point, cache memory is 800 bytes and current unroll memory is 60 bytes.
// Requesting 240 more bytes of unroll memory will leave our total unroll memory at
// 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are granted.
- assert(mm.acquireUnrollMemory(dummyBlock, 240L, evictedBlocks))
+ assert(mm.acquireUnrollMemory(dummyBlock, 240L))
assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000
when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
assert(mm.storageMemoryUsed === 1000L)
@@ -174,7 +174,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
// We already have 300 bytes of unroll memory, so requesting 150 more will leave us
// above the 400-byte limit. Since there is not enough free memory, this request will
// fail even after evicting as much as we can (400 - 300 = 100 bytes).
- assert(!mm.acquireUnrollMemory(dummyBlock, 150L, evictedBlocks))
+ assert(!mm.acquireUnrollMemory(dummyBlock, 150L))
assertEvictBlocksToFreeSpaceCalled(ms, 100L)
assert(mm.storageMemoryUsed === 900L)
// Release beyond what was acquired
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 4a1e49b45d..e5cb9d3a99 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -41,14 +41,8 @@ class TestMemoryManager(conf: SparkConf)
grant
}
}
- override def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
- override def acquireUnrollMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
+ override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = true
+ override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = true
override def releaseStorageMemory(numBytes: Long): Unit = {}
override private[memory] def releaseExecutionMemory(
numBytes: Long,
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 6cc48597d3..0c4359c3c2 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -74,24 +74,24 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val maxMemory = 1000L
val (mm, ms) = makeThings(maxMemory)
assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 10L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 10L)
- assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 100L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire more than the max, not granted
- assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
+ assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire up to the max, requests after this are still granted due to LRU eviction
- assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, maxMemory))
assertEvictBlocksToFreeSpaceCalled(ms, 110L)
assert(mm.storageMemoryUsed === 1000L)
assert(evictedBlocks.nonEmpty)
evictedBlocks.clear()
- assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L))
assertEvictBlocksToFreeSpaceCalled(ms, 1L)
assert(evictedBlocks.nonEmpty)
evictedBlocks.clear()
@@ -102,12 +102,12 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
mm.releaseStorageMemory(800L)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 201L)
mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
@@ -120,7 +120,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
// Acquire enough storage memory to exceed the storage region
- assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 750L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
@@ -140,7 +140,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
require(mm.executionMemoryUsed === 300L)
require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
// Acquire some storage memory again, but this time keep it within the storage region
- assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 400L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 400L)
assert(mm.executionMemoryUsed === 300L)
@@ -157,7 +157,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
// Acquire enough storage memory to exceed the storage region size
- assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 700L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 700L)
@@ -182,11 +182,11 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(mm.storageMemoryUsed === 0L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
// Storage should not be able to evict execution
- assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 100L))
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
+ assert(!mm.acquireStorageMemory(dummyBlock, 250L))
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
// Do not attempt to evict blocks, since evicting will not free enough memory:
@@ -199,11 +199,11 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(mm.storageMemoryUsed === 0L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
// Storage should still not be able to evict execution
- assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 750L))
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 750L)
assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes free
- assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
+ assert(!mm.acquireStorageMemory(dummyBlock, 850L))
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 750L)
// Do not attempt to evict blocks, since evicting will not free enough memory:
@@ -243,7 +243,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
// Fill up all of the remaining memory with storage.
- assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 800L))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 800)
assert(mm.executionMemoryUsed === 200)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0f31561170..6e6cf6385f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -184,8 +184,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getLocations("a3").size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master
- store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
- store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
@@ -425,8 +425,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
t2.join()
t3.join()
- store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
- store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
store.waitForAsyncReregister()
}
}
@@ -847,23 +847,37 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](2000))
val bigList = List.fill(8)(new Array[Byte](2000))
+ def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = {
+ val context = TaskContext.empty()
+ try {
+ TaskContext.setTaskContext(context)
+ task
+ } finally {
+ TaskContext.unset()
+ }
+ context.taskMetrics.updatedBlocks.getOrElse(Seq.empty)
+ }
+
// 1 updated block (i.e. list1)
- val updatedBlocks1 =
+ val updatedBlocks1 = getUpdatedBlocks {
store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ }
assert(updatedBlocks1.size === 1)
assert(updatedBlocks1.head._1 === TestBlockId("list1"))
assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
// 1 updated block (i.e. list2)
- val updatedBlocks2 =
+ val updatedBlocks2 = getUpdatedBlocks {
store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ }
assert(updatedBlocks2.size === 1)
assert(updatedBlocks2.head._1 === TestBlockId("list2"))
assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
// 2 updated blocks - list1 is kicked out of memory while list3 is added
- val updatedBlocks3 =
+ val updatedBlocks3 = getUpdatedBlocks {
store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ }
assert(updatedBlocks3.size === 2)
updatedBlocks3.foreach { case (id, status) =>
id match {
@@ -875,8 +889,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
// 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
- val updatedBlocks4 =
+ val updatedBlocks4 = getUpdatedBlocks {
store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ }
assert(updatedBlocks4.size === 2)
updatedBlocks4.foreach { case (id, status) =>
id match {
@@ -889,8 +904,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
// No updated blocks - list5 is too big to fit in store and nothing is kicked out
- val updatedBlocks5 =
+ val updatedBlocks5 = getUpdatedBlocks {
store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ }
assert(updatedBlocks5.size === 0)
// memory store contains only list3 and list4
@@ -1005,8 +1021,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
- memoryStore.reserveUnrollMemoryForThisTask(
- TestBlockId(""), memory, new ArrayBuffer[(BlockId, BlockStatus)])
+ memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory)
}
// Reserve
@@ -1062,11 +1077,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val smallList = List.fill(40)(new Array[Byte](100))
val bigList = List.fill(40)(new Array[Byte](1000))
val memoryStore = store.memoryStore
- val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll with all the space in the world. This should succeed and return an array.
- var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+ var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
memoryStore.releasePendingUnrollMemoryForThisTask()
@@ -1074,24 +1088,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll with not enough space. This should succeed after kicking out someBlock1.
store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
- unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+ unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- assert(droppedBlocks.size === 1)
- assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
- droppedBlocks.clear()
+ assert(memoryStore.contains("someBlock2"))
+ assert(!memoryStore.contains("someBlock1"))
memoryStore.releasePendingUnrollMemoryForThisTask()
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the mean time, however, we kicked out someBlock2 before giving up.
store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
- unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks)
+ unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
- assert(droppedBlocks.size === 1)
- assert(droppedBlocks.head._1 === TestBlockId("someBlock2"))
- droppedBlocks.clear()
+ assert(!memoryStore.contains("someBlock2"))
}
test("safely unroll blocks through putIterator") {
@@ -1238,7 +1249,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
})
assert(result.size === 13000)
assert(result.data === null)
- assert(result.droppedBlocks === Nil)
}
test("put a small ByteBuffer to MemoryStore") {
@@ -1252,6 +1262,5 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
})
assert(result.size === 10000)
assert(result.data === Right(bytes))
- assert(result.droppedBlocks === Nil)
}
}