aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala179
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala107
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala127
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStore.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala97
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala73
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala235
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala4
16 files changed, 365 insertions, 597 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
deleted file mode 100644
index 2b456facd9..0000000000
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import scala.collection.mutable
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage._
-import org.apache.spark.util.CompletionIterator
-
-/**
- * Spark class responsible for passing RDDs partition contents to the BlockManager and making
- * sure a node doesn't load two copies of an RDD at once.
- */
-private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
-
- /** Keys of RDD partitions that are being computed/loaded. */
- private val loading = new mutable.HashSet[RDDBlockId]
-
- /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
- def getOrCompute[T](
- rdd: RDD[T],
- partition: Partition,
- context: TaskContext,
- storageLevel: StorageLevel): Iterator[T] = {
-
- val key = RDDBlockId(rdd.id, partition.index)
- logDebug(s"Looking for partition $key")
- blockManager.get(key) match {
- case Some(blockResult) =>
- // Partition is already materialized, so just return its values
- val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
- existingMetrics.incBytesReadInternal(blockResult.bytes)
-
- val iter = blockResult.data.asInstanceOf[Iterator[T]]
-
- new InterruptibleIterator[T](context, iter) {
- override def next(): T = {
- existingMetrics.incRecordsReadInternal(1)
- delegate.next()
- }
- }
- case None =>
- // Acquire a lock for loading this partition
- // If another thread already holds the lock, wait for it to finish return its results
- val storedValues = acquireLockForPartition[T](key)
- if (storedValues.isDefined) {
- return new InterruptibleIterator[T](context, storedValues.get)
- }
-
- // Otherwise, we have to load the partition ourselves
- try {
- logInfo(s"Partition $key not found, computing it")
- val computedValues = rdd.computeOrReadCheckpoint(partition, context)
- val cachedValues = putInBlockManager(key, computedValues, storageLevel)
- new InterruptibleIterator(context, cachedValues)
- } finally {
- loading.synchronized {
- loading.remove(key)
- loading.notifyAll()
- }
- }
- }
- }
-
- /**
- * Acquire a loading lock for the partition identified by the given block ID.
- *
- * If the lock is free, just acquire it and return None. Otherwise, another thread is already
- * loading the partition, so we wait for it to finish and return the values loaded by the thread.
- */
- private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
- loading.synchronized {
- if (!loading.contains(id)) {
- // If the partition is free, acquire its lock to compute its value
- loading.add(id)
- None
- } else {
- // Otherwise, wait for another thread to finish and return its result
- logInfo(s"Another thread is loading $id, waiting for it to finish...")
- while (loading.contains(id)) {
- try {
- loading.wait()
- } catch {
- case e: Exception =>
- logWarning(s"Exception while waiting for another thread to load $id", e)
- }
- }
- logInfo(s"Finished waiting for $id")
- val values = blockManager.get(id)
- if (!values.isDefined) {
- /* The block is not guaranteed to exist even after the other thread has finished.
- * For instance, the block could be evicted after it was put, but before our get.
- * In this case, we still need to load the partition ourselves. */
- logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
- loading.add(id)
- }
- values.map(_.data.asInstanceOf[Iterator[T]])
- }
- }
- }
-
- /**
- * Cache the values of a partition, keeping track of any updates in the storage statuses of
- * other blocks along the way.
- *
- * The effective storage level refers to the level that actually specifies BlockManager put
- * behavior, not the level originally specified by the user. This is mainly for forcing a
- * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
- * while preserving the original semantics of the RDD as specified by the application.
- */
- private def putInBlockManager[T](
- key: BlockId,
- values: Iterator[T],
- level: StorageLevel,
- effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
-
- val putLevel = effectiveStorageLevel.getOrElse(level)
- if (!putLevel.useMemory) {
- /*
- * This RDD is not to be cached in memory, so we can just pass the computed values as an
- * iterator directly to the BlockManager rather than first fully unrolling it in memory.
- */
- blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
- blockManager.get(key) match {
- case Some(v) => v.data.asInstanceOf[Iterator[T]]
- case None =>
- logInfo(s"Failure to store $key")
- throw new BlockException(key, s"Block manager failed to return cached value for $key!")
- }
- } else {
- /*
- * This RDD is to be cached in memory. In this case we cannot pass the computed values
- * to the BlockManager as an iterator and expect to read it back later. This is because
- * we may end up dropping a partition from memory store before getting it back.
- *
- * In addition, we must be careful to not unroll the entire partition in memory at once.
- * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
- * single partition. Instead, we unroll the values cautiously, potentially aborting and
- * dropping the partition to disk if applicable.
- */
- blockManager.memoryStore.unrollSafely(key, values) match {
- case Left(arr) =>
- // We have successfully unrolled the entire partition, so cache it in memory
- blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
- CompletionIterator[T, Iterator[T]](
- arr.iterator.asInstanceOf[Iterator[T]],
- blockManager.releaseLock(key))
- case Right(it) =>
- // There is not enough space to cache this partition in memory
- val returnValues = it.asInstanceOf[Iterator[T]]
- if (putLevel.useDisk) {
- logWarning(s"Persisting partition $key to disk instead.")
- val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
- useOffHeap = false, deserialized = false, putLevel.replication)
- putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel))
- } else {
- returnValues
- }
- }
- }
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 204f7356f7..b3b3729625 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -56,7 +56,6 @@ class SparkEnv (
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
- val cacheManager: CacheManager,
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
@@ -333,8 +332,6 @@ object SparkEnv extends Logging {
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
- val cacheManager = new CacheManager(blockManager)
-
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
@@ -371,7 +368,6 @@ object SparkEnv extends Logging {
rpcEnv,
serializer,
closureSerializer,
- cacheManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index c08f87a8b4..dabc810018 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -99,18 +99,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
val blockManager = SparkEnv.get.blockManager
- if (blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
- blockManager.releaseLock(broadcastId)
- } else {
+ if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
val blocks =
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
val pieceId = BroadcastBlockId(id, "piece" + i)
- if (blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
- blockManager.releaseLock(pieceId)
- } else {
+ if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
}
@@ -130,22 +126,24 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
// First try getLocalBytes because there is a chance that previous attempts to fetch the
// broadcast blocks have already fetched some of the blocks. In that case, some blocks
// would be available locally (on this executor).
- def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
- def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
- // If we found the block from remote executors/driver's BlockManager, put the block
- // in this executor's BlockManager.
- if (!bm.putBytes(pieceId, block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
- throw new SparkException(
- s"Failed to store $pieceId of $broadcastId in local BlockManager")
- }
- block
+ bm.getLocalBytes(pieceId) match {
+ case Some(block) =>
+ blocks(pid) = block
+ releaseLock(pieceId)
+ case None =>
+ bm.getRemoteBytes(pieceId) match {
+ case Some(b) =>
+ // We found the block from remote executors/driver's BlockManager, so put the block
+ // in this executor's BlockManager.
+ if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
+ throw new SparkException(
+ s"Failed to store $pieceId of $broadcastId in local BlockManager")
+ }
+ blocks(pid) = b
+ case None =>
+ throw new SparkException(s"Failed to get $pieceId of $broadcastId")
+ }
}
- val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
- throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
- // At this point we are guaranteed to hold a read lock, since we either got the block locally
- // or stored the remotely-fetched block and automatically downgraded the write lock.
- blocks(pid) = block
- releaseLock(pieceId)
}
blocks
}
@@ -191,9 +189,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
val storageLevel = StorageLevel.MEMORY_AND_DISK
- if (blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
- releaseLock(broadcastId)
- } else {
+ if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
obj
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a959f200d4..e88d6cd089 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -292,11 +292,8 @@ private[spark] class Executor(
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= maxRpcMessageSize) {
val blockId = TaskResultBlockId(taskId)
- val putSucceeded = env.blockManager.putBytes(
+ env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
- if (putSucceeded) {
- env.blockManager.releaseLock(blockId)
- }
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index e4246df83a..e86933b948 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -66,10 +66,7 @@ class NettyBlockRpcServer(
serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
- val putSucceeded = blockManager.putBlockData(blockId, data, level)
- if (putSucceeded) {
- blockManager.releaseLock(blockId)
- }
+ blockManager.putBlockData(blockId, data, level)
responseContext.onSuccess(ByteBuffer.allocate(0))
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6a6ad2d75a..e5fdebc65d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -37,7 +37,7 @@ import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler,
@@ -272,7 +272,7 @@ abstract class RDD[T: ClassTag](
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
- SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
+ getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
@@ -315,6 +315,35 @@ abstract class RDD[T: ClassTag](
}
/**
+ * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
+ */
+ private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
+ val blockId = RDDBlockId(id, partition.index)
+ var readCachedBlock = true
+ // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
+ SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => {
+ readCachedBlock = false
+ computeOrReadCheckpoint(partition, context)
+ }) match {
+ case Left(blockResult) =>
+ if (readCachedBlock) {
+ val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
+ existingMetrics.incBytesReadInternal(blockResult.bytes)
+ new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
+ override def next(): T = {
+ existingMetrics.incRecordsReadInternal(1)
+ delegate.next()
+ }
+ }
+ } else {
+ new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
+ }
+ case Right(iter) =>
+ new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
+ }
+ }
+
+ /**
* Execute a block of code in a scope such that all new RDDs created in this body will
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
*
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 0eda97e58d..b23244ad51 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -71,27 +71,13 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
_writerTask = t
checkInvariants()
}
- private[this] var _writerTask: Long = 0
-
- /**
- * True if this block has been removed from the BlockManager and false otherwise.
- * This field is used to communicate block deletion to blocked readers / writers (see its usage
- * in [[BlockInfoManager]]).
- */
- def removed: Boolean = _removed
- def removed_=(r: Boolean): Unit = {
- _removed = r
- checkInvariants()
- }
- private[this] var _removed: Boolean = false
+ private[this] var _writerTask: Long = BlockInfo.NO_WRITER
private def checkInvariants(): Unit = {
// A block's reader count must be non-negative:
assert(_readerCount >= 0)
// A block is either locked for reading or for writing, but not for both at the same time:
assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
- // If a block is removed then it is not locked:
- assert(!_removed || (_readerCount == 0 && _writerTask == BlockInfo.NO_WRITER))
}
checkInvariants()
@@ -195,16 +181,22 @@ private[storage] class BlockInfoManager extends Logging {
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo] = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
- infos.get(blockId).map { info =>
- while (info.writerTask != BlockInfo.NO_WRITER) {
- if (blocking) wait() else return None
+ do {
+ infos.get(blockId) match {
+ case None => return None
+ case Some(info) =>
+ if (info.writerTask == BlockInfo.NO_WRITER) {
+ info.readerCount += 1
+ readLocksByTask(currentTaskAttemptId).add(blockId)
+ logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
+ return Some(info)
+ }
}
- if (info.removed) return None
- info.readerCount += 1
- readLocksByTask(currentTaskAttemptId).add(blockId)
- logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
- info
- }
+ if (blocking) {
+ wait()
+ }
+ } while (blocking)
+ None
}
/**
@@ -226,21 +218,25 @@ private[storage] class BlockInfoManager extends Logging {
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo] = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
- infos.get(blockId).map { info =>
- if (info.writerTask == currentTaskAttemptId) {
- throw new IllegalStateException(
- s"Task $currentTaskAttemptId has already locked $blockId for writing")
- } else {
- while (info.writerTask != BlockInfo.NO_WRITER || info.readerCount != 0) {
- if (blocking) wait() else return None
- }
- if (info.removed) return None
+ do {
+ infos.get(blockId) match {
+ case None => return None
+ case Some(info) =>
+ if (info.writerTask == currentTaskAttemptId) {
+ throw new IllegalStateException(
+ s"Task $currentTaskAttemptId has already locked $blockId for writing")
+ } else if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
+ info.writerTask = currentTaskAttemptId
+ writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+ logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
+ return Some(info)
+ }
}
- info.writerTask = currentTaskAttemptId
- writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
- logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
- info
- }
+ if (blocking) {
+ wait()
+ }
+ } while (blocking)
+ None
}
/**
@@ -306,29 +302,30 @@ private[storage] class BlockInfoManager extends Logging {
}
/**
- * Atomically create metadata for a block and acquire a write lock for it, if it doesn't already
- * exist.
+ * Attempt to acquire the appropriate lock for writing a new block.
+ *
+ * This enforces the first-writer-wins semantics. If we are the first to write the block,
+ * then just go ahead and acquire the write lock. Otherwise, if another thread is already
+ * writing the block, then we wait for the write to finish before acquiring the read lock.
*
- * @param blockId the block id.
- * @param newBlockInfo the block info for the new block.
* @return true if the block did not already exist, false otherwise. If this returns false, then
- * no new locks are acquired. If this returns true, a write lock on the new block will
- * be held.
+ * a read lock on the existing block will be held. If this returns true, a write lock on
+ * the new block will be held.
*/
def lockNewBlockForWriting(
blockId: BlockId,
newBlockInfo: BlockInfo): Boolean = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
- if (!infos.contains(blockId)) {
- infos(blockId) = newBlockInfo
- newBlockInfo.writerTask = currentTaskAttemptId
- writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
- logTrace(s"Task $currentTaskAttemptId successfully locked new block $blockId")
- true
- } else {
- logTrace(s"Task $currentTaskAttemptId did not create and lock block $blockId " +
- s"because that block already exists")
- false
+ lockForReading(blockId) match {
+ case Some(info) =>
+ // Block already exists. This could happen if another thread races with us to compute
+ // the same block. In this case, just keep the read lock and return.
+ false
+ case None =>
+ // Block does not yet exist or is removed, so we are free to acquire the write lock
+ infos(blockId) = newBlockInfo
+ lockForWriting(blockId)
+ true
}
}
@@ -418,7 +415,6 @@ private[storage] class BlockInfoManager extends Logging {
infos.remove(blockId)
blockInfo.readerCount = 0
blockInfo.writerTask = BlockInfo.NO_WRITER
- blockInfo.removed = true
}
case None =>
throw new IllegalArgumentException(
@@ -434,7 +430,6 @@ private[storage] class BlockInfoManager extends Logging {
infos.valuesIterator.foreach { blockInfo =>
blockInfo.readerCount = 0
blockInfo.writerTask = BlockInfo.NO_WRITER
- blockInfo.removed = true
}
infos.clear()
readLocksByTask.clear()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 29124b368e..b59191b291 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -44,8 +44,7 @@ import org.apache.spark.util._
private[spark] sealed trait BlockValues
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
-private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
-private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
+private[spark] case class IteratorValues(iterator: () => Iterator[Any]) extends BlockValues
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
@@ -648,8 +647,38 @@ private[spark] class BlockManager(
}
/**
- * @return true if the block was stored or false if the block was already stored or an
- * error occurred.
+ * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method
+ * to compute the block, persist it, and return its values.
+ *
+ * @return either a BlockResult if the block was successfully cached, or an iterator if the block
+ * could not be cached.
+ */
+ def getOrElseUpdate(
+ blockId: BlockId,
+ level: StorageLevel,
+ makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
+ // Initially we hold no locks on this block.
+ doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
+ case None =>
+ // doPut() didn't hand work back to us, so the block already existed or was successfully
+ // stored. Therefore, we now hold a read lock on the block.
+ val blockResult = get(blockId).getOrElse {
+ // Since we held a read lock between the doPut() and get() calls, the block should not
+ // have been evicted, so get() not returning the block indicates some internal error.
+ releaseLock(blockId)
+ throw new SparkException(s"get() failed for block $blockId even though we held a lock")
+ }
+ Left(blockResult)
+ case Some(failedPutResult) =>
+ // The put failed, likely because the data was too large to fit in memory and could not be
+ // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
+ // that they can decide what to do with the values (e.g. process them without caching).
+ Right(failedPutResult.data.left.get)
+ }
+ }
+
+ /**
+ * @return true if the block was stored or false if an error occurred.
*/
def putIterator(
blockId: BlockId,
@@ -658,7 +687,7 @@ private[spark] class BlockManager(
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(values != null, "Values is null")
- doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
+ doPut(blockId, IteratorValues(() => values), level, tellMaster, effectiveStorageLevel).isEmpty
}
/**
@@ -679,26 +708,9 @@ private[spark] class BlockManager(
}
/**
- * Put a new block of values to the block manager.
- *
- * @return true if the block was stored or false if the block was already stored or an
- * error occurred.
- */
- def putArray(
- blockId: BlockId,
- values: Array[Any],
- level: StorageLevel,
- tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
- require(values != null, "Values is null")
- doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
- }
-
- /**
* Put a new block of serialized bytes to the block manager.
*
- * @return true if the block was stored or false if the block was already stored or an
- * error occurred.
+ * @return true if the block was stored or false if an error occurred.
*/
def putBytes(
blockId: BlockId,
@@ -707,26 +719,32 @@ private[spark] class BlockManager(
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(bytes != null, "Bytes is null")
- doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
+ doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel).isEmpty
}
/**
* Put the given block according to the given level in one of the block stores, replicating
* the values if necessary.
*
- * The effective storage level refers to the level according to which the block will actually be
- * handled. This allows the caller to specify an alternate behavior of doPut while preserving
- * the original level specified by the user.
+ * If the block already exists, this method will not overwrite it.
*
- * @return true if the block was stored or false if the block was already stored or an
- * error occurred.
+ * @param effectiveStorageLevel the level according to which the block will actually be handled.
+ * This allows the caller to specify an alternate behavior of doPut
+ * while preserving the original level specified by the user.
+ * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
+ * block already exists). If false, this method will hold no locks when it
+ * returns.
+ * @return `Some(PutResult)` if the block did not exist and could not be successfully cached,
+ * or None if the block already existed or was successfully stored (fully consuming
+ * the input data / input iterator).
*/
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+ effectiveStorageLevel: Option[StorageLevel] = None,
+ keepReadLock: Boolean = false): Option[PutResult] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -743,7 +761,11 @@ private[spark] class BlockManager(
newInfo
} else {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
- return false
+ if (!keepReadLock) {
+ // lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
+ releaseLock(blockId)
+ }
+ return None
}
}
@@ -779,6 +801,7 @@ private[spark] class BlockManager(
}
var blockWasSuccessfullyStored = false
+ var result: PutResult = null
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
@@ -803,11 +826,9 @@ private[spark] class BlockManager(
}
// Actually put the values
- val result = data match {
+ result = data match {
case IteratorValues(iterator) =>
- blockStore.putIterator(blockId, iterator, putLevel, returnValues)
- case ArrayValues(array) =>
- blockStore.putArray(blockId, array, putLevel, returnValues)
+ blockStore.putIterator(blockId, iterator(), putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
@@ -834,7 +855,11 @@ private[spark] class BlockManager(
}
} finally {
if (blockWasSuccessfullyStored) {
- blockInfoManager.downgradeLock(blockId)
+ if (keepReadLock) {
+ blockInfoManager.downgradeLock(blockId)
+ } else {
+ blockInfoManager.unlock(blockId)
+ }
} else {
blockInfoManager.removeBlock(blockId)
logWarning(s"Putting block $blockId failed")
@@ -852,18 +877,20 @@ private[spark] class BlockManager(
Await.ready(replicationFuture, Duration.Inf)
}
case _ =>
- val remoteStartTime = System.currentTimeMillis
- // Serialize the block if not already done
- if (bytesAfterPut == null) {
- if (valuesAfterPut == null) {
- throw new SparkException(
- "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
+ if (blockWasSuccessfullyStored) {
+ val remoteStartTime = System.currentTimeMillis
+ // Serialize the block if not already done
+ if (bytesAfterPut == null) {
+ if (valuesAfterPut == null) {
+ throw new SparkException(
+ "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
+ }
+ bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
- bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
+ replicate(blockId, bytesAfterPut, putLevel)
+ logDebug("Put block %s remotely took %s"
+ .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
- replicate(blockId, bytesAfterPut, putLevel)
- logDebug("Put block %s remotely took %s"
- .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
@@ -877,7 +904,11 @@ private[spark] class BlockManager(
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
- blockWasSuccessfullyStored
+ if (blockWasSuccessfullyStored) {
+ None
+ } else {
+ Some(result)
+ }
}
/**
@@ -1033,7 +1064,7 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
- diskStore.putArray(blockId, elements, level, returnValues = false)
+ diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index 6f6a6773ba..d3af50d974 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -19,8 +19,6 @@ package org.apache.spark.storage
import java.nio.ByteBuffer
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.spark.Logging
/**
@@ -43,12 +41,6 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
level: StorageLevel,
returnValues: Boolean): PutResult
- def putArray(
- blockId: BlockId,
- values: Array[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult
-
/**
* Return the size of a block in bytes.
*/
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 1f3f193f2f..bfa6560a72 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -58,14 +58,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
- override def putArray(
- blockId: BlockId,
- values: Array[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIterator(blockId, values.toIterator, level, returnValues)
- }
-
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 2f16c8f3d8..317d73abba 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -120,22 +120,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
PutResult(size, data)
}
- override def putArray(
- blockId: BlockId,
- values: Array[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- if (level.deserialized) {
- val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
- tryToPut(blockId, () => values, sizeEstimate, deserialized = true)
- PutResult(sizeEstimate, Left(values.iterator))
- } else {
- val bytes = blockManager.dataSerialize(blockId, values.iterator)
- tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
- }
- }
-
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
@@ -166,7 +150,17 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
- val res = putArray(blockId, arrayValues, level, returnValues)
+ val res = {
+ if (level.deserialized) {
+ val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
+ tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
+ PutResult(sizeEstimate, Left(arrayValues.iterator))
+ } else {
+ val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
+ tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
+ }
+ }
PutResult(res.size, res.data)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
deleted file mode 100644
index ffc02bcb01..0000000000
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage._
-
-// TODO: Test the CacheManager's thread-safety aspects
-class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter
- with MockitoSugar {
-
- var blockManager: BlockManager = _
- var cacheManager: CacheManager = _
- var split: Partition = _
- /** An RDD which returns the values [1, 2, 3, 4]. */
- var rdd: RDD[Int] = _
- var rdd2: RDD[Int] = _
- var rdd3: RDD[Int] = _
-
- before {
- sc = new SparkContext("local", "test")
- blockManager = mock[BlockManager]
- cacheManager = new CacheManager(blockManager)
- split = new Partition { override def index: Int = 0 }
- rdd = new RDD[Int](sc, Nil) {
- override def getPartitions: Array[Partition] = Array(split)
- override val getDependencies = List[Dependency[_]]()
- override def compute(split: Partition, context: TaskContext): Iterator[Int] =
- Array(1, 2, 3, 4).iterator
- }
- rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
- override def getPartitions: Array[Partition] = firstParent[Int].partitions
- override def compute(split: Partition, context: TaskContext): Iterator[Int] =
- firstParent[Int].iterator(split, context)
- }.cache()
- rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
- override def getPartitions: Array[Partition] = firstParent[Int].partitions
- override def compute(split: Partition, context: TaskContext): Iterator[Int] =
- firstParent[Int].iterator(split, context)
- }.cache()
- }
-
- test("get uncached rdd") {
- // Do not mock this test, because attempting to match Array[Any], which is not covariant,
- // in blockManager.put is a losing battle. You have been warned.
- blockManager = sc.env.blockManager
- cacheManager = sc.env.cacheManager
- val context = TaskContext.empty()
- val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
- val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
- assert(computeValue.toList === List(1, 2, 3, 4))
- assert(getValue.isDefined, "Block cached from getOrCompute is not found!")
- assert(getValue.get.data.toList === List(1, 2, 3, 4))
- }
-
- test("get cached rdd") {
- val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
- when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
-
- val context = TaskContext.empty()
- val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
- assert(value.toList === List(5, 6, 7))
- }
-
- test("verify task metrics updated correctly") {
- cacheManager = sc.env.cacheManager
- val context = TaskContext.empty()
- try {
- TaskContext.setTaskContext(context)
- sc.env.blockManager.registerTask(0)
- cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
- assert(context.taskMetrics.updatedBlockStatuses.size === 2)
- } finally {
- TaskContext.unset()
- }
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 662b18f667..fe83fc722a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -80,10 +80,18 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
withTaskId(1) {
assert(blockInfoManager.lockNewBlockForWriting("block", blockInfo))
assert(blockInfoManager.get("block").get eq blockInfo)
- assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
- assert(blockInfoManager.get("block").get eq blockInfo)
assert(blockInfo.readerCount === 0)
assert(blockInfo.writerTask === 1)
+ // Downgrade lock so that second call doesn't block:
+ blockInfoManager.downgradeLock("block")
+ assert(blockInfo.readerCount === 1)
+ assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+ assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ assert(blockInfo.readerCount === 2)
+ assert(blockInfoManager.get("block").get eq blockInfo)
+ assert(blockInfo.readerCount === 2)
+ assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+ blockInfoManager.unlock("block")
blockInfoManager.unlock("block")
assert(blockInfo.readerCount === 0)
assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
@@ -92,6 +100,67 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 1)
}
+ test("lockNewBlockForWriting blocks while write lock is held, then returns false after release") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ }
+ val lock1Future = Future {
+ withTaskId(1) {
+ blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+ }
+ }
+ val lock2Future = Future {
+ withTaskId(2) {
+ blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+ }
+ }
+ Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting
+ withTaskId(0) {
+ blockInfoManager.downgradeLock("block")
+ }
+ // After downgrading to a read lock, both threads should wake up and acquire the shared
+ // read lock.
+ assert(!Await.result(lock1Future, 1.seconds))
+ assert(!Await.result(lock2Future, 1.seconds))
+ assert(blockInfoManager.get("block").get.readerCount === 3)
+ }
+
+ test("lockNewBlockForWriting blocks while write lock is held, then returns true after removal") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ }
+ val lock1Future = Future {
+ withTaskId(1) {
+ blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+ }
+ }
+ val lock2Future = Future {
+ withTaskId(2) {
+ blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+ }
+ }
+ Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting
+ withTaskId(0) {
+ blockInfoManager.removeBlock("block")
+ }
+ // After removing the block, the write lock is released. Both threads should wake up but only
+ // one should acquire the write lock. The second thread should block until the winner of the
+ // write race releases its lock.
+ val winningFuture: Future[Boolean] =
+ Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
+ assert(winningFuture.value.get.get)
+ val winningTID = blockInfoManager.get("block").get.writerTask
+ assert(winningTID === 1 || winningTID === 2)
+ val losingFuture: Future[Boolean] = if (winningTID == 1) lock2Future else lock1Future
+ assert(!losingFuture.isCompleted)
+ // Once the writer releases its lock, the blocked future should wake up again and complete.
+ withTaskId(winningTID) {
+ blockInfoManager.unlock("block")
+ }
+ assert(!Await.result(losingFuture, 1.seconds))
+ assert(blockInfoManager.get("block").get.readerCount === 1)
+ }
+
test("read locks are reentrant") {
withTaskId(1) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index a94d8b424d..ae1faf5d98 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -190,7 +190,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
- stores.head.releaseLock(blockId)
val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
stores.foreach { _.removeBlock(blockId) }
master.removeBlock(blockId)
@@ -252,7 +251,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// Insert a block with 2x replication and return the number of copies of the block
def replicateAndGetNumCopies(blockId: String): Int = {
store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
- store.releaseLock(blockId)
val numLocations = master.getLocations(blockId).size
allStores.foreach { _.removeBlock(blockId) }
numLocations
@@ -290,7 +288,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
- initialStores.head.releaseLock(blockId)
val numLocations = master.getLocations(blockId).size
allStores.foreach { _.removeBlock(blockId) }
numLocations
@@ -358,7 +355,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
val blockId = new TestBlockId(
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
- stores(0).releaseLock(blockId)
// Assert that master know two locations for the block
val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -397,7 +393,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
(1 to 10).foreach {
i =>
testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
- testStore.releaseLock(s"dummy-block-$i")
}
(1 to 10).foreach {
i => testStore.removeBlock(s"dummy-block-$i")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e4ab9ee0eb..89b427049b 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -172,9 +172,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
@@ -205,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
@@ -218,9 +218,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
- store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
@@ -265,9 +265,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory.
- store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@@ -283,8 +283,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.getLocations("nonrddblock") should have size (1)
}
- store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = true)
store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
master.getLocations(rdd(0, 0)) should have size 0
@@ -308,10 +308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// insert broadcast blocks in both the stores
Seq(driverStore, executorStore).foreach { case s =>
- s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
- s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
- s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
- s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
+ s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
+ s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
+ s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
+ s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
}
// verify whether the blocks exist in both the stores
@@ -366,7 +366,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -384,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
- store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()
assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -407,13 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
override def run() {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
}
val t2 = new Thread {
override def run() {
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
}
}
val t3 = new Thread {
@@ -441,11 +441,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
@@ -486,9 +486,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store2 = makeBlockManager(8000, "executor2")
store3 = makeBlockManager(8000, "executor3")
val list1 = List(new Array[Byte](4000))
- store2.putIteratorAndReleaseLock(
+ store2.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store3.putIteratorAndReleaseLock(
+ store3.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
store2.stop()
@@ -515,15 +515,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingleAndReleaseLock("a1", a1, storageLevel)
- store.putSingleAndReleaseLock("a2", a2, storageLevel)
- store.putSingleAndReleaseLock("a3", a3, storageLevel)
+ store.putSingle("a1", a1, storageLevel)
+ store.putSingle("a2", a2, storageLevel)
+ store.putSingle("a3", a3, storageLevel)
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
- store.putSingleAndReleaseLock("a1", a1, storageLevel)
+ store.putSingle("a1", a1, storageLevel)
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
@@ -534,9 +534,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
// Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
// from the same RDD
assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
@@ -550,9 +550,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("in-memory LRU for partitions of multiple RDDs") {
store = makeBlockManager(12000)
- store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// At this point rdd_1_1 should've replaced rdd_0_1
assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -560,8 +560,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Do a get() on rdd_0_2 so that it is the most recently used item
assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
// Put in more partitions from RDD 0; they should replace rdd_1_1
- store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
// when we try to add rdd_0_4.
assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -576,9 +576,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
- store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
- store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+ store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+ store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
+ store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
@@ -607,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingleAndReleaseLock("a1", a1, storageLevel)
- store.putSingleAndReleaseLock("a2", a2, storageLevel)
- store.putSingleAndReleaseLock("a3", a3, storageLevel)
+ store.putSingle("a1", a1, storageLevel)
+ store.putSingle("a2", a2, storageLevel)
+ store.putSingle("a3", a3, storageLevel)
assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@@ -624,15 +624,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
val a4 = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
- store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
- store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
// At this point LRU should not kick in because a3 is only on disk
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
// Now let's add in a4, which uses both disk and memory; a1 should drop out
- store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
@@ -644,11 +644,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
@@ -658,7 +658,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
assert(store.get("list1").get.data.size === 2)
@@ -674,11 +674,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
// First store list1 and list2, both in memory, and list3, on disk only
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val listForSizeEstimate = new ArrayBuffer[Any]
listForSizeEstimate ++= list1.iterator
@@ -697,7 +697,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
@@ -722,9 +722,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("overly large block") {
store = makeBlockManager(5000)
- store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
- store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
}
@@ -733,7 +733,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
try {
conf.set("spark.shuffle.compress", "true")
store = makeBlockManager(20000, "exec1")
- store.putSingleAndReleaseLock(
+ store.putSingle(
ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
"shuffle_0_0_0 was not compressed")
@@ -742,7 +742,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.shuffle.compress", "false")
store = makeBlockManager(20000, "exec2")
- store.putSingleAndReleaseLock(
+ store.putSingle(
ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
"shuffle_0_0_0 was compressed")
@@ -751,7 +751,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.broadcast.compress", "true")
store = makeBlockManager(20000, "exec3")
- store.putSingleAndReleaseLock(
+ store.putSingle(
BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
"broadcast_0 was not compressed")
@@ -760,7 +760,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.broadcast.compress", "false")
store = makeBlockManager(20000, "exec4")
- store.putSingleAndReleaseLock(
+ store.putSingle(
BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
store.stop()
@@ -768,21 +768,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.rdd.compress", "true")
store = makeBlockManager(20000, "exec5")
- store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
store.stop()
store = null
conf.set("spark.rdd.compress", "false")
store = makeBlockManager(20000, "exec6")
- store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
store = makeBlockManager(20000, "exec7")
- store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+ store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
store.stop()
store = null
@@ -810,7 +810,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
class UnserializableClass
val a1 = new UnserializableClass
intercept[java.io.NotSerializableException] {
- store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
+ store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
}
// Make sure get a1 doesn't hang and returns None.
@@ -882,7 +882,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 1 updated block (i.e. list1)
val updatedBlocks1 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks1.size === 1)
@@ -891,7 +891,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 1 updated block (i.e. list2)
val updatedBlocks2 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
assert(updatedBlocks2.size === 1)
@@ -900,7 +900,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 2 updated blocks - list1 is kicked out of memory while list3 is added
val updatedBlocks3 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks3.size === 2)
@@ -915,7 +915,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
val updatedBlocks4 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks4.size === 2)
@@ -931,7 +931,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// No updated blocks - list5 is too big to fit in store and nothing is kicked out
val updatedBlocks5 = getUpdatedBlocks {
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks5.size === 0)
@@ -956,11 +956,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](2000))
// Tell master. By LRU, only list2 and list3 remains.
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
@@ -975,11 +975,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
// This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
// getLocations should return nothing because the master is not informed
@@ -1001,11 +1001,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](100))
// insert some blocks
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
@@ -1015,11 +1015,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
=== 1)
// insert some more blocks
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.putIteratorAndReleaseLock(
+ store.putIterator(
"newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// getLocations and getBlockStatus should yield the same locations
@@ -1030,7 +1030,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
blockIds.foreach { blockId =>
- store.putIteratorAndReleaseLock(
+ store.putIterator(
blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
@@ -1042,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = makeBlockManager(12000)
- store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
- store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Access rdd_1_0 to ensure it's not least recently used.
assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
// According to the same-RDD rule, rdd_1_0 should be replaced here.
- store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// rdd_1_0 should have been replaced, even it's not least recently used.
assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@@ -1126,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
memoryStore.releasePendingUnrollMemoryForThisTask()
// Unroll with not enough space. This should succeed after kicking out someBlock1.
- store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
- store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@@ -1138,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the mean time, however, we kicked out someBlock2 before giving up.
- store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@@ -1170,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// would not know how to drop them from memory later.
memoryStore.remove("b1")
memoryStore.remove("b2")
- store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
- store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
+ store.putIterator("b1", smallIterator, memOnly)
+ store.putIterator("b2", smallIterator, memOnly)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
@@ -1182,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.contains("b3"))
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
memoryStore.remove("b3")
- store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
+ store.putIterator("b3", smallIterator, memOnly)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
@@ -1209,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
- store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
+ store.putIterator("b1", smallIterator, memAndDisk)
+ store.putIterator("b2", smallIterator, memAndDisk)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
@@ -1223,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(!diskStore.contains("b2"))
assert(!diskStore.contains("b3"))
memoryStore.remove("b3")
- store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll huge block with not enough space. This should fail and drop the new block to disk
@@ -1310,12 +1310,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store = makeBlockManager(12000)
val arr = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
- store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
- store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a2", arr, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
// This put should fail because both a1 and a2 should be read-locked:
- store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a3").isEmpty, "a3 was in store")
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
@@ -1324,7 +1324,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.releaseLock("a2")
// Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before
// block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
- store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a2").isEmpty, "a2 was in store")
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
@@ -1335,41 +1335,6 @@ private object BlockManagerSuite {
private implicit class BlockManagerTestUtils(store: BlockManager) {
- def putSingleAndReleaseLock(
- block: BlockId,
- value: Any,
- storageLevel: StorageLevel,
- tellMaster: Boolean): Unit = {
- if (store.putSingle(block, value, storageLevel, tellMaster)) {
- store.releaseLock(block)
- }
- }
-
- def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = {
- if (store.putSingle(block, value, storageLevel)) {
- store.releaseLock(block)
- }
- }
-
- def putIteratorAndReleaseLock(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Unit = {
- if (store.putIterator(blockId, values, level)) {
- store.releaseLock(blockId)
- }
- }
-
- def putIteratorAndReleaseLock(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel,
- tellMaster: Boolean): Unit = {
- if (store.putIterator(blockId, values, level, tellMaster)) {
- store.releaseLock(blockId)
- }
- }
-
def dropFromMemoryIfExists(
blockId: BlockId,
data: () => Either[Array[Any], ByteBuffer]): Unit = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 3d9c085013..e22e320b17 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -91,8 +91,6 @@ private[streaming] class BlockManagerBasedBlockHandler(
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
- } else {
- blockManager.releaseLock(blockId)
}
BlockManagerBasedStoreResult(blockId, numRecords)
}
@@ -191,8 +189,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
- } else {
- blockManager.releaseLock(blockId)
}
}