aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/network/BlockDataManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfo.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala445
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala261
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStore.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala47
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala287
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala510
-rw-r--r--network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala4
22 files changed, 1384 insertions, 438 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 1ec9ba7755..2b456facd9 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
+import org.apache.spark.util.CompletionIterator
/**
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
@@ -47,6 +48,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
existingMetrics.incBytesReadInternal(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
+
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsReadInternal(1)
@@ -156,7 +158,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
- arr.iterator.asInstanceOf[Iterator[T]]
+ CompletionIterator[T, Iterator[T]](
+ arr.iterator.asInstanceOf[Iterator[T]],
+ blockManager.releaseLock(key))
case Right(it) =>
// There is not enough space to cache this partition in memory
val returnValues = it.asInstanceOf[Iterator[T]]
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 9bd69727f6..c08f87a8b4 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.Random
-import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
+import org.apache.spark._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
-import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
+import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.io.ByteArrayChunkOutputStream
@@ -90,22 +90,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
/**
* Divide the object into multiple blocks and put those blocks in the block manager.
+ *
* @param value the object to divide
* @return number of blocks this broadcast variable is divided into
*/
private def writeBlocks(value: T): Int = {
+ import StorageLevel._
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
- SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
- tellMaster = false)
+ val blockManager = SparkEnv.get.blockManager
+ if (blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
+ blockManager.releaseLock(broadcastId)
+ } else {
+ throw new SparkException(s"Failed to store $broadcastId in BlockManager")
+ }
val blocks =
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
- SparkEnv.get.blockManager.putBytes(
- BroadcastBlockId(id, "piece" + i),
- block,
- StorageLevel.MEMORY_AND_DISK_SER,
- tellMaster = true)
+ val pieceId = BroadcastBlockId(id, "piece" + i)
+ if (blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
+ blockManager.releaseLock(pieceId)
+ } else {
+ throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
+ }
}
blocks.length
}
@@ -127,16 +134,18 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
- SparkEnv.get.blockManager.putBytes(
- pieceId,
- block,
- StorageLevel.MEMORY_AND_DISK_SER,
- tellMaster = true)
+ if (!bm.putBytes(pieceId, block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
+ throw new SparkException(
+ s"Failed to store $pieceId of $broadcastId in local BlockManager")
+ }
block
}
val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
+ // At this point we are guaranteed to hold a read lock, since we either got the block locally
+ // or stored the remotely-fetched block and automatically downgraded the write lock.
blocks(pid) = block
+ releaseLock(pieceId)
}
blocks
}
@@ -165,8 +174,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
private def readBroadcastBlock(): T = Utils.tryOrIOException {
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
- SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
+ val blockManager = SparkEnv.get.blockManager
+ blockManager.getLocal(broadcastId).map(_.data.next()) match {
case Some(x) =>
+ releaseLock(broadcastId)
x.asInstanceOf[T]
case None =>
@@ -179,13 +190,36 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
blocks, SparkEnv.get.serializer, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
- SparkEnv.get.blockManager.putSingle(
- broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ val storageLevel = StorageLevel.MEMORY_AND_DISK
+ if (blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
+ releaseLock(broadcastId)
+ } else {
+ throw new SparkException(s"Failed to store $broadcastId in BlockManager")
+ }
obj
}
}
}
+ /**
+ * If running in a task, register the given block's locks for release upon task completion.
+ * Otherwise, if not running in a task then immediately release the lock.
+ */
+ private def releaseLock(blockId: BlockId): Unit = {
+ val blockManager = SparkEnv.get.blockManager
+ Option(TaskContext.get()) match {
+ case Some(taskContext) =>
+ taskContext.addTaskCompletionListener(_ => blockManager.releaseLock(blockId))
+ case None =>
+ // This should only happen on the driver, where broadcast variables may be accessed
+ // outside of running tasks (e.g. when computing rdd.partitions()). In order to allow
+ // broadcast variables to be garbage collected we need to free the reference here
+ // which is slightly unsafe but is technically okay because broadcast variables aren't
+ // stored off-heap.
+ blockManager.releaseLock(blockId)
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 00be3a240d..a602fcac68 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -218,7 +218,9 @@ private[spark] class Executor(
threwException = false
res
} finally {
+ val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
+
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
@@ -227,6 +229,17 @@ private[spark] class Executor(
logError(errMsg)
}
}
+
+ if (releasedLocks.nonEmpty) {
+ val errMsg =
+ s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
+ releasedLocks.mkString("[", ", ", "]")
+ if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
+ throw new SparkException(errMsg)
+ } else {
+ logError(errMsg)
+ }
+ }
}
val taskFinish = System.currentTimeMillis()
@@ -266,8 +279,11 @@ private[spark] class Executor(
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= maxRpcMessageSize) {
val blockId = TaskResultBlockId(taskId)
- env.blockManager.putBytes(
+ val putSucceeded = env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
+ if (putSucceeded) {
+ env.blockManager.releaseLock(blockId)
+ }
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index 1745d52c81..cc5e851c29 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -31,6 +31,14 @@ trait BlockDataManager {
/**
* Put the block locally, using the given storage level.
+ *
+ * Returns true if the block was stored and false if the put operation failed or the block
+ * already existed.
*/
- def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit
+ def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean
+
+ /**
+ * Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
+ */
+ def releaseLock(blockId: BlockId): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index df8c21fb83..e4246df83a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -65,7 +65,11 @@ class NettyBlockRpcServer(
val level: StorageLevel =
serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
- blockManager.putBlockData(BlockId(uploadBlock.blockId), data, level)
+ val blockId = BlockId(uploadBlock.blockId)
+ val putSucceeded = blockManager.putBlockData(blockId, data, level)
+ if (putSucceeded) {
+ blockManager.releaseLock(blockId)
+ }
responseContext.onSuccess(ByteBuffer.allocate(0))
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index a49f3716e2..5c68d001f2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -64,6 +64,7 @@ private[spark] abstract class Task[T](
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem): T = {
+ SparkEnv.get.blockManager.registerTask(taskAttemptId)
context = new TaskContextImpl(
stageId,
partitionId,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
deleted file mode 100644
index 22fdf73e9d..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.util.concurrent.ConcurrentHashMap
-
-private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
- // To save space, 'pending' and 'failed' are encoded as special sizes:
- @volatile var size: Long = BlockInfo.BLOCK_PENDING
- private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
- private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
- private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
-
- setInitThread()
-
- private def setInitThread() {
- /* Set current thread as init thread - waitForReady will not block this thread
- * (in case there is non trivial initialization which ends up calling waitForReady
- * as part of initialization itself) */
- BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
- }
-
- /**
- * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
- * Return true if the block is available, false otherwise.
- */
- def waitForReady(): Boolean = {
- if (pending && initThread != Thread.currentThread()) {
- synchronized {
- while (pending) {
- this.wait()
- }
- }
- }
- !failed
- }
-
- /** Mark this BlockInfo as ready (i.e. block is finished writing) */
- def markReady(sizeInBytes: Long) {
- require(sizeInBytes >= 0, s"sizeInBytes was negative: $sizeInBytes")
- assert(pending)
- size = sizeInBytes
- BlockInfo.blockInfoInitThreads.remove(this)
- synchronized {
- this.notifyAll()
- }
- }
-
- /** Mark this BlockInfo as ready but failed */
- def markFailure() {
- assert(pending)
- size = BlockInfo.BLOCK_FAILED
- BlockInfo.blockInfoInitThreads.remove(this)
- synchronized {
- this.notifyAll()
- }
- }
-}
-
-private object BlockInfo {
- /* initThread is logically a BlockInfo field, but we store it here because
- * it's only needed while this block is in the 'pending' state and we want
- * to minimize BlockInfo's memory footprint. */
- private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
-
- private val BLOCK_PENDING: Long = -1L
- private val BLOCK_FAILED: Long = -2L
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
new file mode 100644
index 0000000000..0eda97e58d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.common.collect.ConcurrentHashMultiset
+
+import org.apache.spark.{Logging, SparkException, TaskContext}
+
+
+/**
+ * Tracks metadata for an individual block.
+ *
+ * Instances of this class are _not_ thread-safe and are protected by locks in the
+ * [[BlockInfoManager]].
+ *
+ * @param level the block's storage level. This is the requested persistence level, not the
+ * effective storage level of the block (i.e. if this is MEMORY_AND_DISK, then this
+ * does not imply that the block is actually resident in memory).
+ * @param tellMaster whether state changes for this block should be reported to the master. This
+ * is true for most blocks, but is false for broadcast blocks.
+ */
+private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
+
+ /**
+ * The size of the block (in bytes)
+ */
+ def size: Long = _size
+ def size_=(s: Long): Unit = {
+ _size = s
+ checkInvariants()
+ }
+ private[this] var _size: Long = 0
+
+ /**
+ * The number of times that this block has been locked for reading.
+ */
+ def readerCount: Int = _readerCount
+ def readerCount_=(c: Int): Unit = {
+ _readerCount = c
+ checkInvariants()
+ }
+ private[this] var _readerCount: Int = 0
+
+ /**
+ * The task attempt id of the task which currently holds the write lock for this block, or
+ * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
+ * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
+ */
+ def writerTask: Long = _writerTask
+ def writerTask_=(t: Long): Unit = {
+ _writerTask = t
+ checkInvariants()
+ }
+ private[this] var _writerTask: Long = 0
+
+ /**
+ * True if this block has been removed from the BlockManager and false otherwise.
+ * This field is used to communicate block deletion to blocked readers / writers (see its usage
+ * in [[BlockInfoManager]]).
+ */
+ def removed: Boolean = _removed
+ def removed_=(r: Boolean): Unit = {
+ _removed = r
+ checkInvariants()
+ }
+ private[this] var _removed: Boolean = false
+
+ private def checkInvariants(): Unit = {
+ // A block's reader count must be non-negative:
+ assert(_readerCount >= 0)
+ // A block is either locked for reading or for writing, but not for both at the same time:
+ assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
+ // If a block is removed then it is not locked:
+ assert(!_removed || (_readerCount == 0 && _writerTask == BlockInfo.NO_WRITER))
+ }
+
+ checkInvariants()
+}
+
+private[storage] object BlockInfo {
+
+ /**
+ * Special task attempt id constant used to mark a block's write lock as being unlocked.
+ */
+ val NO_WRITER: Long = -1
+
+ /**
+ * Special task attempt id constant used to mark a block's write lock as being held by
+ * a non-task thread (e.g. by a driver thread or by unit test code).
+ */
+ val NON_TASK_WRITER: Long = -1024
+}
+
+/**
+ * Component of the [[BlockManager]] which tracks metadata for blocks and manages block locking.
+ *
+ * The locking interface exposed by this class is readers-writer lock. Every lock acquisition is
+ * automatically associated with a running task and locks are automatically released upon task
+ * completion or failure.
+ *
+ * This class is thread-safe.
+ */
+private[storage] class BlockInfoManager extends Logging {
+
+ private type TaskAttemptId = Long
+
+ /**
+ * Used to look up metadata for individual blocks. Entries are added to this map via an atomic
+ * set-if-not-exists operation ([[lockNewBlockForWriting()]]) and are removed
+ * by [[removeBlock()]].
+ */
+ @GuardedBy("this")
+ private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
+
+ /**
+ * Tracks the set of blocks that each task has locked for writing.
+ */
+ @GuardedBy("this")
+ private[this] val writeLocksByTask =
+ new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
+ with mutable.MultiMap[TaskAttemptId, BlockId]
+
+ /**
+ * Tracks the set of blocks that each task has locked for reading, along with the number of times
+ * that a block has been locked (since our read locks are re-entrant).
+ */
+ @GuardedBy("this")
+ private[this] val readLocksByTask =
+ new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
+
+ // ----------------------------------------------------------------------------------------------
+
+ // Initialization for special task attempt ids:
+ registerTask(BlockInfo.NON_TASK_WRITER)
+
+ // ----------------------------------------------------------------------------------------------
+
+ /**
+ * Called at the start of a task in order to register that task with this [[BlockInfoManager]].
+ * This must be called prior to calling any other BlockInfoManager methods from that task.
+ */
+ def registerTask(taskAttemptId: TaskAttemptId): Unit = synchronized {
+ require(!readLocksByTask.contains(taskAttemptId),
+ s"Task attempt $taskAttemptId is already registered")
+ readLocksByTask(taskAttemptId) = ConcurrentHashMultiset.create()
+ }
+
+ /**
+ * Returns the current task's task attempt id (which uniquely identifies the task), or
+ * [[BlockInfo.NON_TASK_WRITER]] if called by a non-task thread.
+ */
+ private def currentTaskAttemptId: TaskAttemptId = {
+ Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(BlockInfo.NON_TASK_WRITER)
+ }
+
+ /**
+ * Lock a block for reading and return its metadata.
+ *
+ * If another task has already locked this block for reading, then the read lock will be
+ * immediately granted to the calling task and its lock count will be incremented.
+ *
+ * If another task has locked this block for writing, then this call will block until the write
+ * lock is released or will return immediately if `blocking = false`.
+ *
+ * A single task can lock a block multiple times for reading, in which case each lock will need
+ * to be released separately.
+ *
+ * @param blockId the block to lock.
+ * @param blocking if true (default), this call will block until the lock is acquired. If false,
+ * this call will return immediately if the lock acquisition fails.
+ * @return None if the block did not exist or was removed (in which case no lock is held), or
+ * Some(BlockInfo) (in which case the block is locked for reading).
+ */
+ def lockForReading(
+ blockId: BlockId,
+ blocking: Boolean = true): Option[BlockInfo] = synchronized {
+ logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
+ infos.get(blockId).map { info =>
+ while (info.writerTask != BlockInfo.NO_WRITER) {
+ if (blocking) wait() else return None
+ }
+ if (info.removed) return None
+ info.readerCount += 1
+ readLocksByTask(currentTaskAttemptId).add(blockId)
+ logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
+ info
+ }
+ }
+
+ /**
+ * Lock a block for writing and return its metadata.
+ *
+ * If another task has already locked this block for either reading or writing, then this call
+ * will block until the other locks are released or will return immediately if `blocking = false`.
+ *
+ * If this is called by a task which already holds the block's exclusive write lock, then this
+ * method will throw an exception.
+ *
+ * @param blockId the block to lock.
+ * @param blocking if true (default), this call will block until the lock is acquired. If false,
+ * this call will return immediately if the lock acquisition fails.
+ * @return None if the block did not exist or was removed (in which case no lock is held), or
+ * Some(BlockInfo) (in which case the block is locked for writing).
+ */
+ def lockForWriting(
+ blockId: BlockId,
+ blocking: Boolean = true): Option[BlockInfo] = synchronized {
+ logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
+ infos.get(blockId).map { info =>
+ if (info.writerTask == currentTaskAttemptId) {
+ throw new IllegalStateException(
+ s"Task $currentTaskAttemptId has already locked $blockId for writing")
+ } else {
+ while (info.writerTask != BlockInfo.NO_WRITER || info.readerCount != 0) {
+ if (blocking) wait() else return None
+ }
+ if (info.removed) return None
+ }
+ info.writerTask = currentTaskAttemptId
+ writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+ logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
+ info
+ }
+ }
+
+ /**
+ * Throws an exception if the current task does not hold a write lock on the given block.
+ * Otherwise, returns the block's BlockInfo.
+ */
+ def assertBlockIsLockedForWriting(blockId: BlockId): BlockInfo = synchronized {
+ infos.get(blockId) match {
+ case Some(info) =>
+ if (info.writerTask != currentTaskAttemptId) {
+ throw new SparkException(
+ s"Task $currentTaskAttemptId has not locked block $blockId for writing")
+ } else {
+ info
+ }
+ case None =>
+ throw new SparkException(s"Block $blockId does not exist")
+ }
+ }
+
+ /**
+ * Get a block's metadata without acquiring any locks. This method is only exposed for use by
+ * [[BlockManager.getStatus()]] and should not be called by other code outside of this class.
+ */
+ private[storage] def get(blockId: BlockId): Option[BlockInfo] = synchronized {
+ infos.get(blockId)
+ }
+
+ /**
+ * Downgrades an exclusive write lock to a shared read lock.
+ */
+ def downgradeLock(blockId: BlockId): Unit = synchronized {
+ logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
+ val info = get(blockId).get
+ require(info.writerTask == currentTaskAttemptId,
+ s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" +
+ s" block $blockId")
+ unlock(blockId)
+ val lockOutcome = lockForReading(blockId, blocking = false)
+ assert(lockOutcome.isDefined)
+ }
+
+ /**
+ * Release a lock on the given block.
+ */
+ def unlock(blockId: BlockId): Unit = synchronized {
+ logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId")
+ val info = get(blockId).getOrElse {
+ throw new IllegalStateException(s"Block $blockId not found")
+ }
+ if (info.writerTask != BlockInfo.NO_WRITER) {
+ info.writerTask = BlockInfo.NO_WRITER
+ writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
+ } else {
+ assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
+ info.readerCount -= 1
+ val countsForTask = readLocksByTask(currentTaskAttemptId)
+ val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
+ assert(newPinCountForTask >= 0,
+ s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it")
+ }
+ notifyAll()
+ }
+
+ /**
+ * Atomically create metadata for a block and acquire a write lock for it, if it doesn't already
+ * exist.
+ *
+ * @param blockId the block id.
+ * @param newBlockInfo the block info for the new block.
+ * @return true if the block did not already exist, false otherwise. If this returns false, then
+ * no new locks are acquired. If this returns true, a write lock on the new block will
+ * be held.
+ */
+ def lockNewBlockForWriting(
+ blockId: BlockId,
+ newBlockInfo: BlockInfo): Boolean = synchronized {
+ logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
+ if (!infos.contains(blockId)) {
+ infos(blockId) = newBlockInfo
+ newBlockInfo.writerTask = currentTaskAttemptId
+ writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+ logTrace(s"Task $currentTaskAttemptId successfully locked new block $blockId")
+ true
+ } else {
+ logTrace(s"Task $currentTaskAttemptId did not create and lock block $blockId " +
+ s"because that block already exists")
+ false
+ }
+ }
+
+ /**
+ * Release all lock held by the given task, clearing that task's pin bookkeeping
+ * structures and updating the global pin counts. This method should be called at the
+ * end of a task (either by a task completion handler or in `TaskRunner.run()`).
+ *
+ * @return the ids of blocks whose pins were released
+ */
+ def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = {
+ val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
+
+ val readLocks = synchronized {
+ readLocksByTask.remove(taskAttemptId).get
+ }
+ val writeLocks = synchronized {
+ writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
+ }
+
+ for (blockId <- writeLocks) {
+ infos.get(blockId).foreach { info =>
+ assert(info.writerTask == taskAttemptId)
+ info.writerTask = BlockInfo.NO_WRITER
+ }
+ blocksWithReleasedLocks += blockId
+ }
+ readLocks.entrySet().iterator().asScala.foreach { entry =>
+ val blockId = entry.getElement
+ val lockCount = entry.getCount
+ blocksWithReleasedLocks += blockId
+ synchronized {
+ get(blockId).foreach { info =>
+ info.readerCount -= lockCount
+ assert(info.readerCount >= 0)
+ }
+ }
+ }
+
+ synchronized {
+ notifyAll()
+ }
+ blocksWithReleasedLocks
+ }
+
+ /**
+ * Returns the number of blocks tracked.
+ */
+ def size: Int = synchronized {
+ infos.size
+ }
+
+ /**
+ * Return the number of map entries in this pin counter's internal data structures.
+ * This is used in unit tests in order to detect memory leaks.
+ */
+ private[storage] def getNumberOfMapEntries: Long = synchronized {
+ size +
+ readLocksByTask.size +
+ readLocksByTask.map(_._2.size()).sum +
+ writeLocksByTask.size +
+ writeLocksByTask.map(_._2.size).sum
+ }
+
+ /**
+ * Returns an iterator over a snapshot of all blocks' metadata. Note that the individual entries
+ * in this iterator are mutable and thus may reflect blocks that are deleted while the iterator
+ * is being traversed.
+ */
+ def entries: Iterator[(BlockId, BlockInfo)] = synchronized {
+ infos.toArray.toIterator
+ }
+
+ /**
+ * Removes the given block and releases the write lock on it.
+ *
+ * This can only be called while holding a write lock on the given block.
+ */
+ def removeBlock(blockId: BlockId): Unit = synchronized {
+ logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId")
+ infos.get(blockId) match {
+ case Some(blockInfo) =>
+ if (blockInfo.writerTask != currentTaskAttemptId) {
+ throw new IllegalStateException(
+ s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
+ } else {
+ infos.remove(blockId)
+ blockInfo.readerCount = 0
+ blockInfo.writerTask = BlockInfo.NO_WRITER
+ blockInfo.removed = true
+ }
+ case None =>
+ throw new IllegalArgumentException(
+ s"Task $currentTaskAttemptId called remove() on non-existent block $blockId")
+ }
+ notifyAll()
+ }
+
+ /**
+ * Delete all state. Called during shutdown.
+ */
+ def clear(): Unit = synchronized {
+ infos.valuesIterator.foreach { blockInfo =>
+ blockInfo.readerCount = 0
+ blockInfo.writerTask = BlockInfo.NO_WRITER
+ blockInfo.removed = true
+ }
+ infos.clear()
+ readLocksByTask.clear()
+ writeLocksByTask.clear()
+ notifyAll()
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 77fd03a6bc..29124b368e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,9 +19,7 @@ package org.apache.spark.storage
import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
@@ -77,7 +75,7 @@ private[spark] class BlockManager(
val diskBlockManager = new DiskBlockManager(this, conf)
- private val blockInfo = new ConcurrentHashMap[BlockId, BlockInfo]
+ private[storage] val blockInfoManager = new BlockInfoManager
private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
@@ -223,8 +221,8 @@ private[spark] class BlockManager(
* will be made then.
*/
private def reportAllBlocks(): Unit = {
- logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
- for ((blockId, info) <- blockInfo.asScala) {
+ logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
+ for ((blockId, info) <- blockInfoManager.entries) {
val status = getCurrentBlockStatus(blockId, info)
if (!tryToReportBlockStatus(blockId, info, status)) {
logError(s"Failed to report $blockId to master; giving up.")
@@ -286,7 +284,7 @@ private[spark] class BlockManager(
.asInstanceOf[Option[ByteBuffer]]
if (blockBytesOpt.isDefined) {
val buffer = blockBytesOpt.get
- new NioManagedBuffer(buffer)
+ new BlockManagerManagedBuffer(this, blockId, buffer)
} else {
throw new BlockNotFoundException(blockId.toString)
}
@@ -296,7 +294,7 @@ private[spark] class BlockManager(
/**
* Put the block locally, using the given storage level.
*/
- override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit = {
+ override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
putBytes(blockId, data.nioByteBuffer(), level)
}
@@ -305,7 +303,7 @@ private[spark] class BlockManager(
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
- blockInfo.asScala.get(blockId).map { info =>
+ blockInfoManager.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
BlockStatus(info.level, memSize = memSize, diskSize = diskSize)
@@ -318,7 +316,12 @@ private[spark] class BlockManager(
* may not know of).
*/
def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
- (blockInfo.asScala.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
+ // The `toArray` is necessary here in order to force the list to be materialized so that we
+ // don't try to serialize a lazy iterator when responding to client requests.
+ (blockInfoManager.entries.map(_._1) ++ diskBlockManager.getAllBlocks())
+ .filter(filter)
+ .toArray
+ .toSeq
}
/**
@@ -425,26 +428,11 @@ private[spark] class BlockManager(
}
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
- val info = blockInfo.get(blockId)
- if (info != null) {
- info.synchronized {
- // Double check to make sure the block is still there. There is a small chance that the
- // block has been removed by removeBlock (which also synchronizes on the blockInfo object).
- // Note that this only checks metadata tracking. If user intentionally deleted the block
- // on disk or from off heap storage without using removeBlock, this conditional check will
- // still pass but eventually we will get an exception because we can't find the block.
- if (blockInfo.asScala.get(blockId).isEmpty) {
- logWarning(s"Block $blockId had been removed")
- return None
- }
-
- // If another thread is writing the block, wait for it to become ready.
- if (!info.waitForReady()) {
- // If we get here, the block write failed.
- logWarning(s"Block $blockId was marked as failure.")
- return None
- }
-
+ blockInfoManager.lockForReading(blockId) match {
+ case None =>
+ logDebug(s"Block $blockId was not found")
+ None
+ case Some(info) =>
val level = info.level
logDebug(s"Level for block $blockId is $level")
@@ -452,7 +440,10 @@ private[spark] class BlockManager(
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
val result = if (asBlockResult) {
- memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
+ memoryStore.getValues(blockId).map { iter =>
+ val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+ new BlockResult(ci, DataReadMethod.Memory, info.size)
+ }
} else {
memoryStore.getBytes(blockId)
}
@@ -470,6 +461,7 @@ private[spark] class BlockManager(
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
case Some(b) => b
case None =>
+ releaseLock(blockId)
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
}
@@ -478,8 +470,9 @@ private[spark] class BlockManager(
if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it
if (asBlockResult) {
- return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
- info.size))
+ val iter = CompletionIterator[Any, Iterator[Any]](
+ dataDeserialize(blockId, bytes), releaseLock(blockId))
+ return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
} else {
return Some(bytes)
}
@@ -511,26 +504,34 @@ private[spark] class BlockManager(
// space to unroll the block. Either way, the put here should return an iterator.
putResult.data match {
case Left(it) =>
- return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
+ val ci = CompletionIterator[Any, Iterator[Any]](it, releaseLock(blockId))
+ return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
case _ =>
// This only happens if we dropped the values back to disk (which is never)
throw new SparkException("Memory store did not return an iterator!")
}
} else {
- return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
+ val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId))
+ return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
}
}
}
+ } else {
+ // This branch represents a case where the BlockInfoManager contained an entry for
+ // the block but the block could not be found in any of the block stores. This case
+ // should never occur, but for completeness's sake we address it here.
+ logError(
+ s"Block $blockId is supposedly stored locally but was not found in any block store")
+ releaseLock(blockId)
+ None
}
- }
- } else {
- logDebug(s"Block $blockId not registered locally")
}
- None
}
/**
* Get block from remote block managers.
+ *
+ * This does not acquire a lock on this block in this JVM.
*/
def getRemote(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting remote block $blockId")
@@ -597,6 +598,10 @@ private[spark] class BlockManager(
/**
* Get a block from the block manager (either local or remote).
+ *
+ * This acquires a read lock on the block if the block was stored locally and does not acquire
+ * any locks if the block was fetched from a remote block manager. The read lock will
+ * automatically be freed once the result's `data` iterator is fully consumed.
*/
def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId)
@@ -613,6 +618,36 @@ private[spark] class BlockManager(
}
/**
+ * Downgrades an exclusive write lock to a shared read lock.
+ */
+ def downgradeLock(blockId: BlockId): Unit = {
+ blockInfoManager.downgradeLock(blockId)
+ }
+
+ /**
+ * Release a lock on the given block.
+ */
+ def releaseLock(blockId: BlockId): Unit = {
+ blockInfoManager.unlock(blockId)
+ }
+
+ /**
+ * Registers a task with the BlockManager in order to initialize per-task bookkeeping structures.
+ */
+ def registerTask(taskAttemptId: Long): Unit = {
+ blockInfoManager.registerTask(taskAttemptId)
+ }
+
+ /**
+ * Release all locks for the given task.
+ *
+ * @return the blocks whose locks were released.
+ */
+ def releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId] = {
+ blockInfoManager.releaseAllLocksForTask(taskAttemptId)
+ }
+
+ /**
* @return true if the block was stored or false if the block was already stored or an
* error occurred.
*/
@@ -703,19 +738,12 @@ private[spark] class BlockManager(
* to be dropped right after it got put into memory. Note, however, that other threads will
* not be able to get() this block until we call markReady on its BlockInfo. */
val putBlockInfo = {
- val tinfo = new BlockInfo(level, tellMaster)
- // Do atomically !
- val oldBlockOpt = Option(blockInfo.putIfAbsent(blockId, tinfo))
- if (oldBlockOpt.isDefined) {
- if (oldBlockOpt.get.waitForReady()) {
- logWarning(s"Block $blockId already exists on this machine; not re-adding it")
- return false
- }
- // TODO: So the block info exists - but previous attempt to load it (?) failed.
- // What do we do now ? Retry on it ?
- oldBlockOpt.get
+ val newInfo = new BlockInfo(level, tellMaster)
+ if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+ newInfo
} else {
- tinfo
+ logWarning(s"Block $blockId already exists on this machine; not re-adding it")
+ return false
}
}
@@ -750,7 +778,7 @@ private[spark] class BlockManager(
case _ => null
}
- var marked = false
+ var blockWasSuccessfullyStored = false
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
@@ -792,11 +820,11 @@ private[spark] class BlockManager(
}
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
- if (putBlockStatus.storageLevel != StorageLevel.NONE) {
+ blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+ if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// let other threads read it, and tell the master about it.
- marked = true
- putBlockInfo.markReady(size)
+ putBlockInfo.size = size
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
@@ -805,13 +833,10 @@ private[spark] class BlockManager(
}
}
} finally {
- // If we failed in putting the block to memory/disk, notify other possible readers
- // that it has failed, and then remove it from the block info map.
- if (!marked) {
- // Note that the remove must happen before markFailure otherwise another thread
- // could've inserted a new BlockInfo before we remove it.
- blockInfo.remove(blockId)
- putBlockInfo.markFailure()
+ if (blockWasSuccessfullyStored) {
+ blockInfoManager.downgradeLock(blockId)
+ } else {
+ blockInfoManager.removeBlock(blockId)
logWarning(s"Putting block $blockId failed")
}
}
@@ -852,7 +877,7 @@ private[spark] class BlockManager(
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
- marked
+ blockWasSuccessfullyStored
}
/**
@@ -989,77 +1014,63 @@ private[spark] class BlockManager(
* store reaches its limit and needs to free up space.
*
* If `data` is not put on disk, it won't be created.
+ *
+ * The caller of this method must hold a write lock on the block before calling this method.
+ * This method does not release the write lock.
+ *
+ * @return the block's new effective StorageLevel.
*/
def dropFromMemory(
blockId: BlockId,
- data: () => Either[Array[Any], ByteBuffer]): Unit = {
-
+ data: () => Either[Array[Any], ByteBuffer]): StorageLevel = {
logInfo(s"Dropping block $blockId from memory")
- val info = blockInfo.get(blockId)
-
- // If the block has not already been dropped
- if (info != null) {
- info.synchronized {
- // required ? As of now, this will be invoked only for blocks which are ready
- // But in case this changes in future, adding for consistency sake.
- if (!info.waitForReady()) {
- // If we get here, the block write failed.
- logWarning(s"Block $blockId was marked as failure. Nothing to drop")
- return
- } else if (blockInfo.asScala.get(blockId).isEmpty) {
- logWarning(s"Block $blockId was already dropped.")
- return
- }
- var blockIsUpdated = false
- val level = info.level
-
- // Drop to disk, if storage level requires
- if (level.useDisk && !diskStore.contains(blockId)) {
- logInfo(s"Writing block $blockId to disk")
- data() match {
- case Left(elements) =>
- diskStore.putArray(blockId, elements, level, returnValues = false)
- case Right(bytes) =>
- diskStore.putBytes(blockId, bytes, level)
- }
- blockIsUpdated = true
- }
+ val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
+ var blockIsUpdated = false
+ val level = info.level
+
+ // Drop to disk, if storage level requires
+ if (level.useDisk && !diskStore.contains(blockId)) {
+ logInfo(s"Writing block $blockId to disk")
+ data() match {
+ case Left(elements) =>
+ diskStore.putArray(blockId, elements, level, returnValues = false)
+ case Right(bytes) =>
+ diskStore.putBytes(blockId, bytes, level)
+ }
+ blockIsUpdated = true
+ }
- // Actually drop from memory store
- val droppedMemorySize =
- if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
- val blockIsRemoved = memoryStore.remove(blockId)
- if (blockIsRemoved) {
- blockIsUpdated = true
- } else {
- logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
- }
+ // Actually drop from memory store
+ val droppedMemorySize =
+ if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
+ val blockIsRemoved = memoryStore.remove(blockId)
+ if (blockIsRemoved) {
+ blockIsUpdated = true
+ } else {
+ logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
+ }
- val status = getCurrentBlockStatus(blockId, info)
- if (info.tellMaster) {
- reportBlockStatus(blockId, info, status, droppedMemorySize)
- }
- if (!level.useDisk) {
- // The block is completely gone from this node; forget it so we can put() it again later.
- blockInfo.remove(blockId)
- }
- if (blockIsUpdated) {
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status)))
- }
- }
+ val status = getCurrentBlockStatus(blockId, info)
+ if (info.tellMaster) {
+ reportBlockStatus(blockId, info, status, droppedMemorySize)
+ }
+ if (blockIsUpdated) {
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status)))
}
}
+ status.storageLevel
}
/**
* Remove all blocks belonging to the given RDD.
+ *
* @return The number of blocks removed.
*/
def removeRdd(rddId: Int): Int = {
// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
logInfo(s"Removing RDD $rddId")
- val blocksToRemove = blockInfo.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
+ val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
blocksToRemove.size
}
@@ -1069,7 +1080,7 @@ private[spark] class BlockManager(
*/
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
logDebug(s"Removing broadcast $broadcastId")
- val blocksToRemove = blockInfo.asScala.keys.collect {
+ val blocksToRemove = blockInfoManager.entries.map(_._1).collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
@@ -1081,9 +1092,11 @@ private[spark] class BlockManager(
*/
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
- val info = blockInfo.get(blockId)
- if (info != null) {
- info.synchronized {
+ blockInfoManager.lockForWriting(blockId) match {
+ case None =>
+ // The block has already been removed; do nothing.
+ logWarning(s"Asked to remove block $blockId, which does not exist")
+ case Some(info) =>
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
@@ -1091,15 +1104,11 @@ private[spark] class BlockManager(
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
- blockInfo.remove(blockId)
+ blockInfoManager.removeBlock(blockId)
if (tellMaster && info.tellMaster) {
val status = getCurrentBlockStatus(blockId, info)
reportBlockStatus(blockId, info, status)
}
- }
- } else {
- // The block has already been removed; do nothing.
- logWarning(s"Asked to remove block $blockId, which does not exist")
}
}
@@ -1174,7 +1183,7 @@ private[spark] class BlockManager(
}
diskBlockManager.stop()
rpcEnv.stop(slaveEndpoint)
- blockInfo.clear()
+ blockInfoManager.clear()
memoryStore.clear()
diskStore.clear()
futureExecutionContext.shutdownNow()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
new file mode 100644
index 0000000000..5886b9c00b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+
+/**
+ * This [[ManagedBuffer]] wraps a [[ByteBuffer]] which was retrieved from the [[BlockManager]]
+ * so that the corresponding block's read lock can be released once this buffer's references
+ * are released.
+ *
+ * This is effectively a wrapper / bridge to connect the BlockManager's notion of read locks
+ * to the network layer's notion of retain / release counts.
+ */
+private[storage] class BlockManagerManagedBuffer(
+ blockManager: BlockManager,
+ blockId: BlockId,
+ buf: ByteBuffer) extends NioManagedBuffer(buf) {
+
+ override def retain(): ManagedBuffer = {
+ super.retain()
+ val locked = blockManager.blockInfoManager.lockForReading(blockId, blocking = false)
+ assert(locked.isDefined)
+ this
+ }
+
+ override def release(): ManagedBuffer = {
+ blockManager.releaseLock(blockId)
+ super.release()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index 69985c9759..6f6a6773ba 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -60,8 +60,10 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
/**
* Remove a block, if it exists.
+ *
* @param blockId the block to remove.
* @return True if the block was found and removed, False otherwise.
+ * @throws IllegalStateException if the block is pinned by a task.
*/
def remove(blockId: BlockId): Boolean
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 024b660ce6..2f16c8f3d8 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -95,7 +95,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val values = blockManager.dataDeserialize(blockId, bytes)
putIterator(blockId, values, level, returnValues = true)
} else {
- tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+ tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
@@ -127,11 +127,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
returnValues: Boolean): PutResult = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
- tryToPut(blockId, values, sizeEstimate, deserialized = true)
+ tryToPut(blockId, () => values, sizeEstimate, deserialized = true)
PutResult(sizeEstimate, Left(values.iterator))
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
- tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+ tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
@@ -208,7 +208,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
- val entry = entries.synchronized { entries.remove(blockId) }
+ val entry = entries.synchronized {
+ entries.remove(blockId)
+ }
if (entry != null) {
memoryManager.releaseStorageMemory(entry.size)
logDebug(s"Block $blockId of size ${entry.size} dropped " +
@@ -327,14 +329,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId.asRDDId.map(_.rddId)
}
- private def tryToPut(
- blockId: BlockId,
- value: Any,
- size: Long,
- deserialized: Boolean): Boolean = {
- tryToPut(blockId, () => value, size, deserialized)
- }
-
/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
@@ -410,6 +404,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
+ def blockIsEvictable(blockId: BlockId): Boolean = {
+ rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
+ }
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
@@ -418,9 +415,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
- if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
- selectedBlocks += blockId
- freedMemory += pair.getValue.size
+ if (blockIsEvictable(blockId)) {
+ // We don't want to evict blocks which are currently being read, so we need to obtain
+ // an exclusive write lock on blocks which are candidates for eviction. We perform a
+ // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
+ if (blockManager.blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
+ selectedBlocks += blockId
+ freedMemory += pair.getValue.size
+ }
}
}
}
@@ -438,7 +440,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
- blockManager.dropFromMemory(blockId, () => data)
+ val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
+ if (newEffectiveStorageLevel.isValid) {
+ // The block is still present in at least one store, so release the lock
+ // but don't delete the block info
+ blockManager.releaseLock(blockId)
+ } else {
+ // The block isn't present in any store, so delete the block info so that the
+ // block can be stored again
+ blockManager.blockInfoManager.removeBlock(blockId)
+ }
}
}
freedMemory
@@ -447,6 +458,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
logInfo(s"Will not store $id as it would require dropping another block " +
"from the same RDD")
}
+ selectedBlocks.foreach { id =>
+ blockManager.releaseLock(id)
+ }
0L
}
}
@@ -463,6 +477,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
/**
* Reserve memory for unrolling the given block for this task.
+ *
* @return whether the request is granted.
*/
def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 48a0282b30..ffc02bcb01 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -87,6 +87,7 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
val context = TaskContext.empty()
try {
TaskContext.setTaskContext(context)
+ sc.env.blockManager.registerTask(0)
cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
assert(context.taskMetrics.updatedBlockStatuses.size === 2)
} finally {
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 7b02380917..d1e806b2eb 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -485,7 +485,8 @@ class CleanerTester(
def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
try {
eventually(waitTimeout, interval(100 millis)) {
- assert(isAllCleanedUp)
+ assert(isAllCleanedUp,
+ "The following resources were not cleaned up:\n" + uncleanedResourcesToString)
}
postCleanupValidate()
} finally {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index a0483f6483..c1484b0afa 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.serializer.KryoDistributedTest._
import org.apache.spark.util.Utils
-class KryoSerializerDistributedSuite extends SparkFunSuite {
+class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContext {
test("kryo objects are serialised consistently in different processes") {
val conf = new SparkConf(false)
@@ -34,7 +34,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite {
val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
conf.setJars(List(jar.getPath))
- val sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val original = Thread.currentThread.getContextClassLoader
val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
SparkEnv.get.serializer.setDefaultClassLoader(loader)
@@ -47,8 +47,6 @@ class KryoSerializerDistributedSuite extends SparkFunSuite {
// Join the two RDDs, and force evaluation
assert(shuffledRDD.join(cachedRDD).collect().size == 1)
-
- LocalSparkContext.stop(sc)
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
new file mode 100644
index 0000000000..662b18f667
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.language.implicitConversions
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.{SparkException, SparkFunSuite, TaskContext, TaskContextImpl}
+
+
+class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+ private implicit val ec = ExecutionContext.global
+ private var blockInfoManager: BlockInfoManager = _
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ blockInfoManager = new BlockInfoManager()
+ for (t <- 0 to 4) {
+ blockInfoManager.registerTask(t)
+ }
+ }
+
+ override protected def afterEach(): Unit = {
+ try {
+ blockInfoManager = null
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ private implicit def stringToBlockId(str: String): BlockId = {
+ TestBlockId(str)
+ }
+
+ private def newBlockInfo(): BlockInfo = {
+ new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)
+ }
+
+ private def withTaskId[T](taskAttemptId: Long)(block: => T): T = {
+ try {
+ TaskContext.setTaskContext(new TaskContextImpl(0, 0, taskAttemptId, 0, null, null))
+ block
+ } finally {
+ TaskContext.unset()
+ }
+ }
+
+ test("initial memory usage") {
+ assert(blockInfoManager.size === 0)
+ }
+
+ test("get non-existent block") {
+ assert(blockInfoManager.get("non-existent-block").isEmpty)
+ assert(blockInfoManager.lockForReading("non-existent-block").isEmpty)
+ assert(blockInfoManager.lockForWriting("non-existent-block").isEmpty)
+ }
+
+ test("basic lockNewBlockForWriting") {
+ val initialNumMapEntries = blockInfoManager.getNumberOfMapEntries
+ val blockInfo = newBlockInfo()
+ withTaskId(1) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", blockInfo))
+ assert(blockInfoManager.get("block").get eq blockInfo)
+ assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ assert(blockInfoManager.get("block").get eq blockInfo)
+ assert(blockInfo.readerCount === 0)
+ assert(blockInfo.writerTask === 1)
+ blockInfoManager.unlock("block")
+ assert(blockInfo.readerCount === 0)
+ assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+ }
+ assert(blockInfoManager.size === 1)
+ assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 1)
+ }
+
+ test("read locks are reentrant") {
+ withTaskId(1) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ blockInfoManager.unlock("block")
+ assert(blockInfoManager.lockForReading("block").isDefined)
+ assert(blockInfoManager.lockForReading("block").isDefined)
+ assert(blockInfoManager.get("block").get.readerCount === 2)
+ assert(blockInfoManager.get("block").get.writerTask === BlockInfo.NO_WRITER)
+ blockInfoManager.unlock("block")
+ assert(blockInfoManager.get("block").get.readerCount === 1)
+ blockInfoManager.unlock("block")
+ assert(blockInfoManager.get("block").get.readerCount === 0)
+ }
+ }
+
+ test("multiple tasks can hold read locks") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ blockInfoManager.unlock("block")
+ }
+ withTaskId(1) { assert(blockInfoManager.lockForReading("block").isDefined) }
+ withTaskId(2) { assert(blockInfoManager.lockForReading("block").isDefined) }
+ withTaskId(3) { assert(blockInfoManager.lockForReading("block").isDefined) }
+ withTaskId(4) { assert(blockInfoManager.lockForReading("block").isDefined) }
+ assert(blockInfoManager.get("block").get.readerCount === 4)
+ }
+
+ test("single task can hold write lock") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ blockInfoManager.unlock("block")
+ }
+ withTaskId(1) {
+ assert(blockInfoManager.lockForWriting("block").isDefined)
+ assert(blockInfoManager.get("block").get.writerTask === 1)
+ }
+ withTaskId(2) {
+ assert(blockInfoManager.lockForWriting("block", blocking = false).isEmpty)
+ assert(blockInfoManager.get("block").get.writerTask === 1)
+ }
+ }
+
+ test("cannot call lockForWriting while already holding a write lock") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ blockInfoManager.unlock("block")
+ }
+ withTaskId(1) {
+ assert(blockInfoManager.lockForWriting("block").isDefined)
+ intercept[IllegalStateException] {
+ blockInfoManager.lockForWriting("block")
+ }
+ blockInfoManager.assertBlockIsLockedForWriting("block")
+ }
+ }
+
+ test("assertBlockIsLockedForWriting throws exception if block is not locked") {
+ intercept[SparkException] {
+ blockInfoManager.assertBlockIsLockedForWriting("block")
+ }
+ withTaskId(BlockInfo.NON_TASK_WRITER) {
+ intercept[SparkException] {
+ blockInfoManager.assertBlockIsLockedForWriting("block")
+ }
+ }
+ }
+
+ test("downgrade lock") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ blockInfoManager.downgradeLock("block")
+ }
+ withTaskId(1) {
+ assert(blockInfoManager.lockForReading("block").isDefined)
+ }
+ assert(blockInfoManager.get("block").get.readerCount === 2)
+ assert(blockInfoManager.get("block").get.writerTask === BlockInfo.NO_WRITER)
+ }
+
+ test("write lock will block readers") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ }
+ val get1Future = Future {
+ withTaskId(1) {
+ blockInfoManager.lockForReading("block")
+ }
+ }
+ val get2Future = Future {
+ withTaskId(2) {
+ blockInfoManager.lockForReading("block")
+ }
+ }
+ Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting
+ withTaskId(0) {
+ blockInfoManager.unlock("block")
+ }
+ assert(Await.result(get1Future, 1.seconds).isDefined)
+ assert(Await.result(get2Future, 1.seconds).isDefined)
+ assert(blockInfoManager.get("block").get.readerCount === 2)
+ }
+
+ test("read locks will block writer") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ blockInfoManager.unlock("block")
+ blockInfoManager.lockForReading("block")
+ }
+ val write1Future = Future {
+ withTaskId(1) {
+ blockInfoManager.lockForWriting("block")
+ }
+ }
+ val write2Future = Future {
+ withTaskId(2) {
+ blockInfoManager.lockForWriting("block")
+ }
+ }
+ Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting
+ withTaskId(0) {
+ blockInfoManager.unlock("block")
+ }
+ assert(
+ Await.result(Future.firstCompletedOf(Seq(write1Future, write2Future)), 1.seconds).isDefined)
+ val firstWriteWinner = if (write1Future.isCompleted) 1 else 2
+ withTaskId(firstWriteWinner) {
+ blockInfoManager.unlock("block")
+ }
+ assert(Await.result(write1Future, 1.seconds).isDefined)
+ assert(Await.result(write2Future, 1.seconds).isDefined)
+ }
+
+ test("removing a non-existent block throws IllegalArgumentException") {
+ withTaskId(0) {
+ intercept[IllegalArgumentException] {
+ blockInfoManager.removeBlock("non-existent-block")
+ }
+ }
+ }
+
+ test("removing a block without holding any locks throws IllegalStateException") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ blockInfoManager.unlock("block")
+ intercept[IllegalStateException] {
+ blockInfoManager.removeBlock("block")
+ }
+ }
+ }
+
+ test("removing a block while holding only a read lock throws IllegalStateException") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ blockInfoManager.unlock("block")
+ assert(blockInfoManager.lockForReading("block").isDefined)
+ intercept[IllegalStateException] {
+ blockInfoManager.removeBlock("block")
+ }
+ }
+ }
+
+ test("removing a block causes blocked callers to receive None") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ }
+ val getFuture = Future {
+ withTaskId(1) {
+ blockInfoManager.lockForReading("block")
+ }
+ }
+ val writeFuture = Future {
+ withTaskId(2) {
+ blockInfoManager.lockForWriting("block")
+ }
+ }
+ Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting
+ withTaskId(0) {
+ blockInfoManager.removeBlock("block")
+ }
+ assert(Await.result(getFuture, 1.seconds).isEmpty)
+ assert(Await.result(writeFuture, 1.seconds).isEmpty)
+ }
+
+ test("releaseAllLocksForTask releases write locks") {
+ val initialNumMapEntries = blockInfoManager.getNumberOfMapEntries
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ }
+ assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 3)
+ blockInfoManager.releaseAllLocksForTask(0)
+ assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 3fd6fb4560..a94d8b424d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -190,6 +190,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
+ stores.head.releaseLock(blockId)
val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
stores.foreach { _.removeBlock(blockId) }
master.removeBlock(blockId)
@@ -251,6 +252,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// Insert a block with 2x replication and return the number of copies of the block
def replicateAndGetNumCopies(blockId: String): Int = {
store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
+ store.releaseLock(blockId)
val numLocations = master.getLocations(blockId).size
allStores.foreach { _.removeBlock(blockId) }
numLocations
@@ -288,6 +290,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+ initialStores.head.releaseLock(blockId)
val numLocations = master.getLocations(blockId).size
allStores.foreach { _.removeBlock(blockId) }
numLocations
@@ -355,6 +358,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
val blockId = new TestBlockId(
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+ stores(0).releaseLock(blockId)
// Assert that master know two locations for the block
val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -367,6 +371,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
}.foreach { testStore =>
val testStoreName = testStore.blockManagerId.executorId
assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+ testStore.releaseLock(blockId)
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")
@@ -392,6 +397,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
(1 to 10).foreach {
i =>
testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
+ testStore.releaseLock(s"dummy-block-$i")
}
(1 to 10).foreach {
i => testStore.removeBlock(s"dummy-block-$i")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e1b2c9633e..e4ab9ee0eb 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -45,6 +45,8 @@ import org.apache.spark.util._
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
with PrivateMethodTester with ResetSystemProperties {
+ import BlockManagerSuite._
+
var conf: SparkConf = null
var store: BlockManager = null
var store2: BlockManager = null
@@ -66,6 +68,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER,
master: BlockManagerMaster = this.master): BlockManager = {
+ val serializer = new KryoSerializer(conf)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
@@ -169,14 +172,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory
- assert(store.getSingle("a1").isDefined, "a1 was not in store")
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- assert(store.getSingle("a3").isDefined, "a3 was not in store")
+ assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
+ assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+ assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
// Checking whether master knows about the blocks or not
assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -184,10 +187,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getLocations("a3").size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master
- store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
- store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
- assert(store.getSingle("a1") === None, "a1 not removed from store")
- assert(store.getSingle("a2") === None, "a2 not removed from store")
+ store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
+ assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store")
+ assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
assert(master.getLocations("a2").size === 0, "master did not remove a2")
}
@@ -202,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+ store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
@@ -215,17 +218,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
- store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
- store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
- assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
- assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
- assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
+ assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store")
+ assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store")
+ assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store")
// Checking whether master knows about the blocks or not
assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
@@ -238,15 +241,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeBlock("a3-to-remove")
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
- store.getSingle("a1-to-remove") should be (None)
+ assert(!store.hasLocalBlock("a1-to-remove"))
master.getLocations("a1-to-remove") should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
- store.getSingle("a2-to-remove") should be (None)
+ assert(!store.hasLocalBlock("a2-to-remove"))
master.getLocations("a2-to-remove") should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
- store.getSingle("a3-to-remove") should not be (None)
+ assert(store.hasLocalBlock("a3-to-remove"))
master.getLocations("a3-to-remove") should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@@ -262,30 +265,30 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory.
- store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
- store.getSingle(rdd(0, 0)) should be (None)
+ store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
master.getLocations(rdd(0, 0)) should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
- store.getSingle(rdd(0, 1)) should be (None)
+ store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
master.getLocations(rdd(0, 1)) should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
- store.getSingle("nonrddblock") should not be (None)
+ store.getSingleAndReleaseLock("nonrddblock") should not be (None)
master.getLocations("nonrddblock") should have size (1)
}
- store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = true)
- store.getSingle(rdd(0, 0)) should be (None)
+ store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
master.getLocations(rdd(0, 0)) should have size 0
- store.getSingle(rdd(0, 1)) should be (None)
+ store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
master.getLocations(rdd(0, 1)) should have size 0
}
@@ -305,54 +308,54 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// insert broadcast blocks in both the stores
Seq(driverStore, executorStore).foreach { case s =>
- s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
- s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
- s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
- s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
+ s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
+ s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
+ s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
+ s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
}
// verify whether the blocks exist in both the stores
Seq(driverStore, executorStore).foreach { case s =>
- s.getLocal(broadcast0BlockId) should not be (None)
- s.getLocal(broadcast1BlockId) should not be (None)
- s.getLocal(broadcast2BlockId) should not be (None)
- s.getLocal(broadcast2BlockId2) should not be (None)
+ assert(s.hasLocalBlock(broadcast0BlockId))
+ assert(s.hasLocalBlock(broadcast1BlockId))
+ assert(s.hasLocalBlock(broadcast2BlockId))
+ assert(s.hasLocalBlock(broadcast2BlockId2))
}
// remove broadcast 0 block only from executors
master.removeBroadcast(0, removeFromMaster = false, blocking = true)
// only broadcast 0 block should be removed from the executor store
- executorStore.getLocal(broadcast0BlockId) should be (None)
- executorStore.getLocal(broadcast1BlockId) should not be (None)
- executorStore.getLocal(broadcast2BlockId) should not be (None)
+ assert(!executorStore.hasLocalBlock(broadcast0BlockId))
+ assert(executorStore.hasLocalBlock(broadcast1BlockId))
+ assert(executorStore.hasLocalBlock(broadcast2BlockId))
// nothing should be removed from the driver store
- driverStore.getLocal(broadcast0BlockId) should not be (None)
- driverStore.getLocal(broadcast1BlockId) should not be (None)
- driverStore.getLocal(broadcast2BlockId) should not be (None)
+ assert(driverStore.hasLocalBlock(broadcast0BlockId))
+ assert(driverStore.hasLocalBlock(broadcast1BlockId))
+ assert(driverStore.hasLocalBlock(broadcast2BlockId))
// remove broadcast 0 block from the driver as well
master.removeBroadcast(0, removeFromMaster = true, blocking = true)
- driverStore.getLocal(broadcast0BlockId) should be (None)
- driverStore.getLocal(broadcast1BlockId) should not be (None)
+ assert(!driverStore.hasLocalBlock(broadcast0BlockId))
+ assert(driverStore.hasLocalBlock(broadcast1BlockId))
// remove broadcast 1 block from both the stores asynchronously
// and verify all broadcast 1 blocks have been removed
master.removeBroadcast(1, removeFromMaster = true, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
- driverStore.getLocal(broadcast1BlockId) should be (None)
- executorStore.getLocal(broadcast1BlockId) should be (None)
+ assert(!driverStore.hasLocalBlock(broadcast1BlockId))
+ assert(!executorStore.hasLocalBlock(broadcast1BlockId))
}
// remove broadcast 2 from both the stores asynchronously
// and verify all broadcast 2 blocks have been removed
master.removeBroadcast(2, removeFromMaster = true, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
- driverStore.getLocal(broadcast2BlockId) should be (None)
- driverStore.getLocal(broadcast2BlockId2) should be (None)
- executorStore.getLocal(broadcast2BlockId) should be (None)
- executorStore.getLocal(broadcast2BlockId2) should be (None)
+ assert(!driverStore.hasLocalBlock(broadcast2BlockId))
+ assert(!driverStore.hasLocalBlock(broadcast2BlockId2))
+ assert(!executorStore.hasLocalBlock(broadcast2BlockId))
+ assert(!executorStore.hasLocalBlock(broadcast2BlockId2))
}
executorStore.stop()
driverStore.stop()
@@ -363,9 +366,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
- assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.removeExecutor(store.blockManagerId.executorId)
@@ -381,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()
assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -404,12 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
override def run() {
- store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
}
val t2 = new Thread {
override def run() {
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
}
}
val t3 = new Thread {
@@ -425,8 +429,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
t2.join()
t3.join()
- store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
- store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
store.waitForAsyncReregister()
}
}
@@ -437,9 +441,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
- store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
assert(list1Get.get.data.size === 2)
@@ -479,8 +486,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store2 = makeBlockManager(8000, "executor2")
store3 = makeBlockManager(8000, "executor3")
val list1 = List(new Array[Byte](4000))
- store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store2.putIteratorAndReleaseLock(
+ "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store3.putIteratorAndReleaseLock(
+ "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
store2.stop()
store2 = null
@@ -506,18 +515,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingle("a1", a1, storageLevel)
- store.putSingle("a2", a2, storageLevel)
- store.putSingle("a3", a3, storageLevel)
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- assert(store.getSingle("a3").isDefined, "a3 was not in store")
- assert(store.getSingle("a1") === None, "a1 was in store")
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ store.putSingleAndReleaseLock("a1", a1, storageLevel)
+ store.putSingleAndReleaseLock("a2", a2, storageLevel)
+ store.putSingleAndReleaseLock("a3", a3, storageLevel)
+ assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+ assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
+ assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
+ assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
- store.putSingle("a1", a1, storageLevel)
- assert(store.getSingle("a1").isDefined, "a1 was not in store")
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- assert(store.getSingle("a3") === None, "a3 was in store")
+ store.putSingleAndReleaseLock("a1", a1, storageLevel)
+ assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
+ assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+ assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
}
test("in-memory LRU for partitions of same RDD") {
@@ -525,34 +534,34 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
// Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
// from the same RDD
- assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
- assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
- assert(store.getSingle(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
+ assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
+ assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
+ assert(store.getSingleAndReleaseLock(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
// Check that rdd_0_3 doesn't replace them even after further accesses
- assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
- assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
- assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
+ assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
+ assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
+ assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
}
test("in-memory LRU for partitions of multiple RDDs") {
store = makeBlockManager(12000)
- store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// At this point rdd_1_1 should've replaced rdd_0_1
assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
// Do a get() on rdd_0_2 so that it is the most recently used item
- assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
+ assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
// Put in more partitions from RDD 0; they should replace rdd_1_1
- store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
// when we try to add rdd_0_4.
assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -567,28 +576,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
- store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
- store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
- store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
- assert(store.getSingle("a2").isDefined, "a2 was in store")
- assert(store.getSingle("a3").isDefined, "a3 was in store")
- assert(store.getSingle("a1").isDefined, "a1 was in store")
+ store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
+ store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
+ store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+ assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
+ assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
+ assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
}
test("disk and memory storage") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingle)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingleAndReleaseLock)
}
test("disk and memory storage with getLocalBytes") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytes)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytesAndReleaseLock)
}
test("disk and memory storage with serialization") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingle)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingleAndReleaseLock)
}
test("disk and memory storage with serialization and getLocalBytes") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytes)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytesAndReleaseLock)
}
def testDiskAndMemoryStorage(
@@ -598,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingle("a1", a1, storageLevel)
- store.putSingle("a2", a2, storageLevel)
- store.putSingle("a3", a3, storageLevel)
+ store.putSingleAndReleaseLock("a1", a1, storageLevel)
+ store.putSingleAndReleaseLock("a2", a2, storageLevel)
+ store.putSingleAndReleaseLock("a3", a3, storageLevel)
assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@@ -615,19 +624,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
val a4 = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
- store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
+ store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
// At this point LRU should not kick in because a3 is only on disk
- assert(store.getSingle("a1").isDefined, "a1 was not in store")
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- assert(store.getSingle("a3").isDefined, "a3 was not in store")
+ assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
+ assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+ assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
// Now let's add in a4, which uses both disk and memory; a1 should drop out
- store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
- assert(store.getSingle("a1") == None, "a1 was in store")
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- assert(store.getSingle("a3").isDefined, "a3 was not in store")
- assert(store.getSingle("a4").isDefined, "a4 was not in store")
+ store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+ assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
+ assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+ assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
+ assert(store.getSingleAndReleaseLock("a4").isDefined, "a4 was not in store")
}
test("in-memory LRU with streams") {
@@ -635,23 +644,27 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
- store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- assert(store.get("list2").isDefined, "list2 was not in store")
+ store.putIteratorAndReleaseLock(
+ "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
- assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.data.size === 2)
- assert(store.get("list1") === None, "list1 was in store")
- assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
+ assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- assert(store.get("list1").isDefined, "list1 was not in store")
+ store.putIteratorAndReleaseLock(
+ "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
assert(store.get("list1").get.data.size === 2)
- assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
- assert(store.get("list3") === None, "list1 was in store")
+ assert(store.getAndReleaseLock("list3") === None, "list1 was in store")
}
test("LRU with mixed storage levels and streams") {
@@ -661,33 +674,37 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
// First store list1 and list2, both in memory, and list3, on disk only
- store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val listForSizeEstimate = new ArrayBuffer[Any]
listForSizeEstimate ++= list1.iterator
val listSize = SizeEstimator.estimate(listForSizeEstimate)
// At this point LRU should not kick in because list3 is only on disk
- assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
assert(store.get("list1").get.data.size === 2)
- assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
- assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.data.size === 2)
- assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
assert(store.get("list1").get.data.size === 2)
- assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
- assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.putIterator("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
- assert(store.get("list1") === None, "list1 was in store")
- assert(store.get("list2").isDefined, "list2 was not in store")
+ store.putIteratorAndReleaseLock(
+ "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
+ assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
+ assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
- assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.data.size === 2)
- assert(store.get("list4").isDefined, "list4 was not in store")
+ assert(store.getAndReleaseLock("list4").isDefined, "list4 was not in store")
assert(store.get("list4").get.data.size === 2)
}
@@ -705,18 +722,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("overly large block") {
store = makeBlockManager(5000)
- store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
- assert(store.getSingle("a1") === None, "a1 was in store")
- store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
+ store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+ assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
+ store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
}
test("block compression") {
try {
conf.set("spark.shuffle.compress", "true")
store = makeBlockManager(20000, "exec1")
- store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingleAndReleaseLock(
+ ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
"shuffle_0_0_0 was not compressed")
store.stop()
@@ -724,7 +742,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.shuffle.compress", "false")
store = makeBlockManager(20000, "exec2")
- store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingleAndReleaseLock(
+ ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
"shuffle_0_0_0 was compressed")
store.stop()
@@ -732,7 +751,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.broadcast.compress", "true")
store = makeBlockManager(20000, "exec3")
- store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingleAndReleaseLock(
+ BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
"broadcast_0 was not compressed")
store.stop()
@@ -740,28 +760,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.broadcast.compress", "false")
store = makeBlockManager(20000, "exec4")
- store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingleAndReleaseLock(
+ BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
store.stop()
store = null
conf.set("spark.rdd.compress", "true")
store = makeBlockManager(20000, "exec5")
- store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
store.stop()
store = null
conf.set("spark.rdd.compress", "false")
store = makeBlockManager(20000, "exec6")
- store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
store = makeBlockManager(20000, "exec7")
- store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
store.stop()
store = null
@@ -789,12 +810,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
class UnserializableClass
val a1 = new UnserializableClass
intercept[java.io.NotSerializableException] {
- store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+ store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
}
// Make sure get a1 doesn't hang and returns None.
failAfter(1 second) {
- assert(store.getSingle("a1").isEmpty, "a1 should not be in store")
+ assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store")
}
}
@@ -844,6 +865,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("updated block statuses") {
store = makeBlockManager(12000)
+ store.registerTask(0)
val list = List.fill(2)(new Array[Byte](2000))
val bigList = List.fill(8)(new Array[Byte](2000))
@@ -860,7 +882,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 1 updated block (i.e. list1)
val updatedBlocks1 = getUpdatedBlocks {
- store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks1.size === 1)
assert(updatedBlocks1.head._1 === TestBlockId("list1"))
@@ -868,7 +891,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 1 updated block (i.e. list2)
val updatedBlocks2 = getUpdatedBlocks {
- store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
assert(updatedBlocks2.size === 1)
assert(updatedBlocks2.head._1 === TestBlockId("list2"))
@@ -876,7 +900,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 2 updated blocks - list1 is kicked out of memory while list3 is added
val updatedBlocks3 = getUpdatedBlocks {
- store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks3.size === 2)
updatedBlocks3.foreach { case (id, status) =>
@@ -890,7 +915,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
val updatedBlocks4 = getUpdatedBlocks {
- store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks4.size === 2)
updatedBlocks4.foreach { case (id, status) =>
@@ -905,7 +931,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// No updated blocks - list5 is too big to fit in store and nothing is kicked out
val updatedBlocks5 = getUpdatedBlocks {
- store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks5.size === 0)
@@ -929,9 +956,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](2000))
// Tell master. By LRU, only list2 and list3 remains.
- store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
assert(store.master.getLocations("list1").size === 0)
@@ -945,9 +975,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
// This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
- store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
- store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putIteratorAndReleaseLock(
+ "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putIteratorAndReleaseLock(
+ "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ store.putIteratorAndReleaseLock(
+ "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
// getLocations should return nothing because the master is not informed
// getBlockStatus without asking slaves should have the same result
@@ -968,9 +1001,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](100))
// insert some blocks
- store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size
@@ -979,9 +1015,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
=== 1)
// insert some more blocks
- store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ store.putIteratorAndReleaseLock(
+ "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ store.putIteratorAndReleaseLock(
+ "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// getLocations and getBlockStatus should yield the same locations
assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size
@@ -991,7 +1030,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
blockIds.foreach { blockId =>
- store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIteratorAndReleaseLock(
+ blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
case RDDBlockId(1, _) => true
@@ -1002,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = makeBlockManager(12000)
- store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Access rdd_1_0 to ensure it's not least recently used.
- assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
+ assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
// According to the same-RDD rule, rdd_1_0 should be replaced here.
- store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// rdd_1_0 should have been replaced, even it's not least recently used.
assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@@ -1086,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
memoryStore.releasePendingUnrollMemoryForThisTask()
// Unroll with not enough space. This should succeed after kicking out someBlock1.
- store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
- store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@@ -1098,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the mean time, however, we kicked out someBlock2 before giving up.
- store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@@ -1130,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// would not know how to drop them from memory later.
memoryStore.remove("b1")
memoryStore.remove("b2")
- store.putIterator("b1", smallIterator, memOnly)
- store.putIterator("b2", smallIterator, memOnly)
+ store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
+ store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
@@ -1142,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.contains("b3"))
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
memoryStore.remove("b3")
- store.putIterator("b3", smallIterator, memOnly)
+ store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
@@ -1169,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- store.putIterator("b1", smallIterator, memAndDisk)
- store.putIterator("b2", smallIterator, memAndDisk)
+ store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
+ store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
@@ -1183,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(!diskStore.contains("b2"))
assert(!diskStore.contains("b3"))
memoryStore.remove("b3")
- store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+ store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll huge block with not enough space. This should fail and drop the new block to disk
@@ -1244,6 +1284,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store = makeBlockManager(12000)
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
+ store.blockInfoManager.lockNewBlockForWriting(
+ blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
val result = memoryStore.putBytes(blockId, 13000, () => {
fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
})
@@ -1263,4 +1305,104 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(result.size === 10000)
assert(result.data === Right(bytes))
}
+
+ test("read-locked blocks cannot be evicted from the MemoryStore") {
+ store = makeBlockManager(12000)
+ val arr = new Array[Byte](4000)
+ // First store a1 and a2, both in memory, and a3, on disk only
+ store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ // This put should fail because both a1 and a2 should be read-locked:
+ store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+ assert(store.getSingle("a3").isEmpty, "a3 was in store")
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ // Release both pins of block a2:
+ store.releaseLock("a2")
+ store.releaseLock("a2")
+ // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before
+ // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
+ store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+ assert(store.getSingle("a2").isEmpty, "a2 was in store")
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.getSingle("a3").isDefined, "a3 was not in store")
+ }
+}
+
+private object BlockManagerSuite {
+
+ private implicit class BlockManagerTestUtils(store: BlockManager) {
+
+ def putSingleAndReleaseLock(
+ block: BlockId,
+ value: Any,
+ storageLevel: StorageLevel,
+ tellMaster: Boolean): Unit = {
+ if (store.putSingle(block, value, storageLevel, tellMaster)) {
+ store.releaseLock(block)
+ }
+ }
+
+ def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = {
+ if (store.putSingle(block, value, storageLevel)) {
+ store.releaseLock(block)
+ }
+ }
+
+ def putIteratorAndReleaseLock(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel): Unit = {
+ if (store.putIterator(blockId, values, level)) {
+ store.releaseLock(blockId)
+ }
+ }
+
+ def putIteratorAndReleaseLock(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel,
+ tellMaster: Boolean): Unit = {
+ if (store.putIterator(blockId, values, level, tellMaster)) {
+ store.releaseLock(blockId)
+ }
+ }
+
+ def dropFromMemoryIfExists(
+ blockId: BlockId,
+ data: () => Either[Array[Any], ByteBuffer]): Unit = {
+ store.blockInfoManager.lockForWriting(blockId).foreach { info =>
+ val newEffectiveStorageLevel = store.dropFromMemory(blockId, data)
+ if (newEffectiveStorageLevel.isValid) {
+ // The block is still present in at least one store, so release the lock
+ // but don't delete the block info
+ store.releaseLock(blockId)
+ } else {
+ // The block isn't present in any store, so delete the block info so that the
+ // block can be stored again
+ store.blockInfoManager.removeBlock(blockId)
+ }
+ }
+ }
+
+ private def wrapGet[T](f: BlockId => Option[T]): BlockId => Option[T] = (blockId: BlockId) => {
+ val result = f(blockId)
+ if (result.isDefined) {
+ store.releaseLock(blockId)
+ }
+ result
+ }
+
+ def hasLocalBlock(blockId: BlockId): Boolean = {
+ getLocalAndReleaseLock(blockId).isDefined
+ }
+
+ val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocal)
+ val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
+ val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
+ val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)
+ }
+
}
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
index f55b884bc4..631d767715 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
@@ -28,7 +28,7 @@ import io.netty.buffer.Unpooled;
/**
* A {@link ManagedBuffer} backed by {@link ByteBuffer}.
*/
-public final class NioManagedBuffer extends ManagedBuffer {
+public class NioManagedBuffer extends ManagedBuffer {
private final ByteBuffer buf;
public NioManagedBuffer(ByteBuffer buf) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 83d7953aaf..efa2eeaf4d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -46,7 +46,9 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
def isMaterialized(rddId: Int): Boolean = {
- sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
+ val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
+ maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
+ maybeBlock.nonEmpty
}
test("withColumn doesn't invalidate cached dataframe") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 11863caffe..86f02e68e5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -40,7 +40,9 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
}
def isMaterialized(rddId: Int): Boolean = {
- sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
+ val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
+ maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
+ maybeBlock.nonEmpty
}
test("cache table") {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index e22e320b17..3d9c085013 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -91,6 +91,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
+ } else {
+ blockManager.releaseLock(blockId)
}
BlockManagerBasedStoreResult(blockId, numRecords)
}
@@ -189,6 +191,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
+ } else {
+ blockManager.releaseLock(blockId)
}
}