aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala612
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStore.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala114
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala (renamed from core/src/main/scala/org/apache/spark/storage/MemoryStore.scala)170
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala74
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala62
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala10
17 files changed, 536 insertions, 613 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index dabc810018..550e1ba6d3 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -173,7 +173,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
- blockManager.getLocal(broadcastId).map(_.data.next()) match {
+ blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
case Some(x) =>
releaseLock(broadcastId)
x.asInstanceOf[T]
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index b5adbd88a2..e89b03e38b 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy
import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.MemoryAllocator
@@ -113,6 +114,7 @@ private[spark] abstract class MemoryManager(
/**
* Release all memory for the given task and mark it as inactive (e.g. when a task ends).
+ *
* @return the number of bytes freed.
*/
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 6a88966f60..1d376adf1a 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy
import org.apache.spark.Logging
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
/**
* Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
@@ -55,6 +56,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ *
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b38e2ec57f..873330e136 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,24 +40,15 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
+import org.apache.spark.storage.memory._
import org.apache.spark.util._
-private[spark] sealed trait BlockValues
-private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
-private[spark] case class IteratorValues(iterator: () => Iterator[Any]) extends BlockValues
-
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
val readMethod: DataReadMethod.Value,
val bytes: Long)
-// Class for representing return value of doPut()
-private sealed trait DoPutResult
-private case object DoPutSucceeded extends DoPutResult
-private case object DoPutBytesFailed extends DoPutResult
-private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult
-
/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
@@ -78,7 +69,15 @@ private[spark] class BlockManager(
numUsableCores: Int)
extends BlockDataManager with Logging {
- val diskBlockManager = new DiskBlockManager(this, conf)
+ private[spark] val externalShuffleServiceEnabled =
+ conf.getBoolean("spark.shuffle.service.enabled", false)
+
+ val diskBlockManager = {
+ // Only perform cleanup if an external service is not serving our shuffle files.
+ val deleteFilesOnStop =
+ !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
+ new DiskBlockManager(conf, deleteFilesOnStop)
+ }
private[storage] val blockInfoManager = new BlockInfoManager
@@ -86,8 +85,8 @@ private[spark] class BlockManager(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
// Actual storage of where blocks are kept
- private[spark] val memoryStore = new MemoryStore(this, memoryManager)
- private[spark] val diskStore = new DiskStore(this, diskBlockManager)
+ private[spark] val memoryStore = new MemoryStore(conf, this, memoryManager)
+ private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
memoryManager.setMemoryStore(memoryStore)
// Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
@@ -96,9 +95,6 @@ private[spark] class BlockManager(
// to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxMemory = memoryManager.maxStorageMemory
- private[spark]
- val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
-
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
private val externalShuffleServicePort = {
@@ -285,13 +281,9 @@ private[spark] class BlockManager(
if (blockId.isShuffle) {
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
- val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
- .asInstanceOf[Option[ByteBuffer]]
- if (blockBytesOpt.isDefined) {
- val buffer = blockBytesOpt.get
- new BlockManagerManagedBuffer(this, blockId, buffer)
- } else {
- throw new BlockNotFoundException(blockId.toString)
+ getLocalBytes(blockId) match {
+ case Some(buffer) => new BlockManagerManagedBuffer(this, blockId, buffer)
+ case None => throw new BlockNotFoundException(blockId.toString)
}
}
}
@@ -407,11 +399,71 @@ private[spark] class BlockManager(
}
/**
- * Get block from local block manager.
+ * Get block from local block manager as an iterator of Java objects.
*/
- def getLocal(blockId: BlockId): Option[BlockResult] = {
+ def getLocalValues(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
- doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
+ blockInfoManager.lockForReading(blockId) match {
+ case None =>
+ logDebug(s"Block $blockId was not found")
+ None
+ case Some(info) =>
+ val level = info.level
+ logDebug(s"Level for block $blockId is $level")
+ if (level.useMemory && memoryStore.contains(blockId)) {
+ val iter: Iterator[Any] = if (level.deserialized) {
+ memoryStore.getValues(blockId).get
+ } else {
+ dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+ }
+ val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+ Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
+ } else if (level.useDisk && diskStore.contains(blockId)) {
+ val iterToReturn: Iterator[Any] = {
+ val diskBytes = diskStore.getBytes(blockId)
+ if (level.deserialized) {
+ val diskIterator = dataDeserialize(blockId, diskBytes)
+ if (level.useMemory) {
+ // Cache the values before returning them
+ memoryStore.putIterator(blockId, diskIterator, level) match {
+ case Left(iter) =>
+ // The memory store put() failed, so it returned the iterator back to us:
+ iter
+ case Right(_) =>
+ // The put() succeeded, so we can read the values back:
+ memoryStore.getValues(blockId).get
+ }
+ } else {
+ diskIterator
+ }
+ } else { // storage level is serialized
+ if (level.useMemory) {
+ // Cache the bytes back into memory to speed up subsequent reads.
+ val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
+ // https://issues.apache.org/jira/browse/SPARK-6076
+ // If the file size is bigger than the free memory, OOM will happen. So if we
+ // cannot put it into MemoryStore, copyForMemory should not be created. That's why
+ // this action is put into a `() => ByteBuffer` and created lazily.
+ val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
+ copyForMemory.put(diskBytes)
+ })
+ if (putSucceeded) {
+ dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+ } else {
+ dataDeserialize(blockId, diskBytes)
+ }
+ } else {
+ dataDeserialize(blockId, diskBytes)
+ }
+ }
+ }
+ val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
+ Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ } else {
+ releaseLock(blockId)
+ throw new SparkException(s"Block $blockId was not found even though it's read-locked")
+ }
+ }
}
/**
@@ -428,77 +480,44 @@ private[spark] class BlockManager(
Option(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
- doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
- }
- }
-
- private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
- blockInfoManager.lockForReading(blockId) match {
- case None =>
- logDebug(s"Block $blockId was not found")
- None
- case Some(info) =>
- doGetLocal(blockId, info, asBlockResult)
+ blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
}
}
/**
- * Get a local block from the block manager.
- * Assumes that the caller holds a read lock on the block.
+ * Get block from the local block manager as serialized bytes.
+ *
+ * Must be called while holding a read lock on the block.
+ * Releases the read lock upon exception; keeps the read lock upon successful return.
*/
- private def doGetLocal(
- blockId: BlockId,
- info: BlockInfo,
- asBlockResult: Boolean): Option[Any] = {
+ private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = {
val level = info.level
logDebug(s"Level for block $blockId is $level")
-
- // Look for the block in memory
- if (level.useMemory) {
- logDebug(s"Getting block $blockId from memory")
- val result = if (asBlockResult) {
- memoryStore.getValues(blockId).map { iter =>
- val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
- new BlockResult(ci, DataReadMethod.Memory, info.size)
- }
+ // In order, try to read the serialized bytes from memory, then from disk, then fall back to
+ // serializing in-memory objects, and, finally, throw an exception if the block does not exist.
+ if (level.deserialized) {
+ // Try to avoid expensive serialization by reading a pre-serialized copy from disk:
+ if (level.useDisk && diskStore.contains(blockId)) {
+ // Note: we purposely do not try to put the block back into memory here. Since this branch
+ // handles deserialized blocks, this block may only be cached in memory as objects, not
+ // serialized bytes. Because the caller only requested bytes, it doesn't make sense to
+ // cache the block's deserialized objects since that caching may not have a payoff.
+ diskStore.getBytes(blockId)
+ } else if (level.useMemory && memoryStore.contains(blockId)) {
+ // The block was not found on disk, so serialize an in-memory copy:
+ dataSerialize(blockId, memoryStore.getValues(blockId).get)
} else {
- memoryStore.getBytes(blockId)
- }
- result match {
- case Some(values) =>
- return result
- case None =>
- logDebug(s"Block $blockId not found in memory")
- }
- }
-
- // Look for block on disk, potentially storing it back in memory if required
- if (level.useDisk) {
- logDebug(s"Getting block $blockId from disk")
- val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
- case Some(b) => b
- case None =>
- releaseLock(blockId)
- throw new BlockException(
- blockId, s"Block $blockId not found on disk, though it should be")
+ releaseLock(blockId)
+ throw new SparkException(s"Block $blockId was not found even though it's read-locked")
}
- assert(0 == bytes.position())
-
- if (!level.useMemory) {
- // If the block shouldn't be stored in memory, we can just return it
- if (asBlockResult) {
- val iter = CompletionIterator[Any, Iterator[Any]](
- dataDeserialize(blockId, bytes), releaseLock(blockId))
- return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
- } else {
- return Some(bytes)
- }
- } else {
- // Otherwise, we also have to store something in the memory store
- if (!level.deserialized && !asBlockResult) {
- /* We'll store the bytes in memory if the block's storage level includes
- * "memory serialized" and we requested its serialized bytes. */
- memoryStore.putBytes(blockId, bytes.limit, () => {
+ } else { // storage level is serialized
+ if (level.useMemory && memoryStore.contains(blockId)) {
+ memoryStore.getBytes(blockId).get
+ } else if (level.useDisk && diskStore.contains(blockId)) {
+ val bytes = diskStore.getBytes(blockId)
+ if (level.useMemory) {
+ // Cache the bytes back into memory to speed up subsequent reads.
+ val memoryStorePutSucceeded = memoryStore.putBytes(blockId, bytes.limit(), () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
// put it into MemoryStore, copyForMemory should not be created. That's why this
@@ -506,39 +525,19 @@ private[spark] class BlockManager(
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
})
- bytes.rewind()
- }
- if (!asBlockResult) {
- return Some(bytes)
- } else {
- val values = dataDeserialize(blockId, bytes)
- val valuesToReturn: Iterator[Any] = {
- if (level.deserialized) {
- // Cache the values before returning them
- memoryStore.putIterator(blockId, values, level, allowPersistToDisk = false) match {
- case Left(iter) =>
- // The memory store put() failed, so it returned the iterator back to us:
- iter
- case Right(_) =>
- // The put() succeeded, so we can read the values back:
- memoryStore.getValues(blockId).get
- }
- } else {
- values
- }
+ if (memoryStorePutSucceeded) {
+ memoryStore.getBytes(blockId).get
+ } else {
+ bytes.rewind()
+ bytes
}
- val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, releaseLock(blockId))
- return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ } else {
+ bytes
}
+ } else {
+ releaseLock(blockId)
+ throw new SparkException(s"Block $blockId was not found even though it's read-locked")
}
- } else {
- // This branch represents a case where the BlockInfoManager contained an entry for
- // the block but the block could not be found in any of the block stores. This case
- // should never occur, but for completeness's sake we address it here.
- logError(
- s"Block $blockId is supposedly stored locally but was not found in any block store")
- releaseLock(blockId)
- None
}
}
@@ -547,17 +546,10 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
- def getRemote(blockId: BlockId): Option[BlockResult] = {
- logDebug(s"Getting remote block $blockId")
- doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
- }
-
- /**
- * Get block from remote block managers as serialized bytes.
- */
- def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
- logDebug(s"Getting remote block $blockId as bytes")
- doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
+ def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+ getRemoteBytes(blockId).map { data =>
+ new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())
+ }
}
/**
@@ -570,7 +562,11 @@ private[spark] class BlockManager(
preferredLocs ++ otherLocs
}
- private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
+ /**
+ * Get block from remote block managers as serialized bytes.
+ */
+ def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+ logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
val locations = getLocations(blockId)
var numFetchFailures = 0
@@ -595,14 +591,7 @@ private[spark] class BlockManager(
}
if (data != null) {
- if (asBlockResult) {
- return Some(new BlockResult(
- dataDeserialize(blockId, data),
- DataReadMethod.Network,
- data.limit()))
- } else {
- return Some(data)
- }
+ return Some(data)
}
logDebug(s"The value of block $blockId is null")
}
@@ -618,12 +607,12 @@ private[spark] class BlockManager(
* automatically be freed once the result's `data` iterator is fully consumed.
*/
def get(blockId: BlockId): Option[BlockResult] = {
- val local = getLocal(blockId)
+ val local = getLocalValues(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
- val remote = getRemote(blockId)
+ val remote = getRemoteValues(blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
@@ -673,24 +662,26 @@ private[spark] class BlockManager(
level: StorageLevel,
makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
// Initially we hold no locks on this block.
- doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
- case DoPutSucceeded =>
+ doPutIterator(blockId, makeIterator, level, keepReadLock = true) match {
+ case None =>
// doPut() didn't hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
- val blockResult = get(blockId).getOrElse {
+ val blockResult = getLocalValues(blockId).getOrElse {
// Since we held a read lock between the doPut() and get() calls, the block should not
// have been evicted, so get() not returning the block indicates some internal error.
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
+ // We already hold a read lock on the block from the doPut() call and getLocalValues()
+ // acquires the lock again, so we need to call releaseLock() here so that the net number
+ // of lock acquisitions is 1 (since the caller will only call release() once).
+ releaseLock(blockId)
Left(blockResult)
- case DoPutIteratorFailed(iter) =>
+ case Some(iter) =>
// The put failed, likely because the data was too large to fit in memory and could not be
// dropped to disk. Therefore, we need to pass the input iterator back to the caller so
// that they can decide what to do with the values (e.g. process them without caching).
Right(iter)
- case DoPutBytesFailed =>
- throw new SparkException("doPut returned an invalid failure response")
}
}
@@ -701,16 +692,10 @@ private[spark] class BlockManager(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
- tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+ tellMaster: Boolean = true): Boolean = {
require(values != null, "Values is null")
- val result = doPut(
- blockId,
- IteratorValues(() => values),
- level,
- tellMaster,
- effectiveStorageLevel)
- result == DoPutSucceeded
+ // If doPut() didn't hand work back to us, then block already existed or was successfully stored
+ doPutIterator(blockId, () => values, level, tellMaster).isEmpty
}
/**
@@ -739,46 +724,105 @@ private[spark] class BlockManager(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
- tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+ tellMaster: Boolean = true): Boolean = {
require(bytes != null, "Bytes is null")
- val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
- result == DoPutSucceeded
+ doPutBytes(blockId, bytes, level, tellMaster)
}
/**
- * Put the given block according to the given level in one of the block stores, replicating
+ * Put the given bytes according to the given level in one of the block stores, replicating
* the values if necessary.
*
* If the block already exists, this method will not overwrite it.
*
- * @param effectiveStorageLevel the level according to which the block will actually be handled.
- * This allows the caller to specify an alternate behavior of doPut
- * while preserving the original level specified by the user.
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
- * @return [[DoPutSucceeded]] if the block was already present or if the put succeeded, or
- * [[DoPutBytesFailed]] if the put failed and we were storing bytes, or
- * [[DoPutIteratorFailed]] if the put failed and we were storing an iterator.
+ * @return true if the block was already present or if the put succeeded, false otherwise.
*/
- private def doPut(
+ private def doPutBytes(
blockId: BlockId,
- data: BlockValues,
+ bytes: ByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None,
- keepReadLock: Boolean = false): DoPutResult = {
+ keepReadLock: Boolean = false): Boolean = {
+ doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+ val startTimeMs = System.currentTimeMillis
+ // Since we're storing bytes, initiate the replication before storing them locally.
+ // This is faster as data is already serialized and ready to send.
+ val replicationFuture = if (level.replication > 1) {
+ // Duplicate doesn't copy the bytes, but just creates a wrapper
+ val bufferView = bytes.duplicate()
+ Future {
+ // This is a blocking action and should run in futureExecutionContext which is a cached
+ // thread pool
+ replicate(blockId, bufferView, level)
+ }(futureExecutionContext)
+ } else {
+ null
+ }
+
+ bytes.rewind()
+ val size = bytes.limit()
+
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ val putSucceeded = if (level.deserialized) {
+ val values = dataDeserialize(blockId, bytes.duplicate())
+ memoryStore.putIterator(blockId, values, level).isRight
+ } else {
+ memoryStore.putBytes(blockId, size, () => bytes)
+ }
+ if (!putSucceeded && level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ diskStore.putBytes(blockId, bytes)
+ }
+ } else if (level.useDisk) {
+ diskStore.putBytes(blockId, bytes)
+ }
+
+ val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+ if (blockWasSuccessfullyStored) {
+ // Now that the block is in either the memory, externalBlockStore, or disk store,
+ // tell the master about it.
+ putBlockInfo.size = size
+ if (tellMaster) {
+ reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+ }
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+ }
+ }
+ logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+ if (level.replication > 1) {
+ // Wait for asynchronous replication to finish
+ Await.ready(replicationFuture, Duration.Inf)
+ }
+ if (blockWasSuccessfullyStored) {
+ None
+ } else {
+ Some(bytes)
+ }
+ }.isEmpty
+ }
+
+ /**
+ * Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]].
+ *
+ * @param putBody a function which attempts the actual put() and returns None on success
+ * or Some on failure.
+ */
+ private def doPut[T](
+ blockId: BlockId,
+ level: StorageLevel,
+ tellMaster: Boolean,
+ keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
- effectiveStorageLevel.foreach { level =>
- require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
- }
- /* Remember the block's storage level so that we can correctly drop it to disk if it needs
- * to be dropped right after it got put into memory. Note, however, that other threads will
- * not be able to get() this block until we call markReady on its BlockInfo. */
val putBlockInfo = {
val newInfo = new BlockInfo(level, tellMaster)
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
@@ -789,138 +833,113 @@ private[spark] class BlockManager(
// lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
releaseLock(blockId)
}
- return DoPutSucceeded
+ return None
}
}
val startTimeMs = System.currentTimeMillis
-
- // Size of the block in bytes
- var size = 0L
-
- // The level we actually use to put the block
- val putLevel = effectiveStorageLevel.getOrElse(level)
-
- // If we're storing bytes, then initiate the replication before storing them locally.
- // This is faster as data is already serialized and ready to send.
- val replicationFuture = data match {
- case b: ByteBufferValues if putLevel.replication > 1 =>
- // Duplicate doesn't copy the bytes, but just creates a wrapper
- val bufferView = b.buffer.duplicate()
- Future {
- // This is a blocking action and should run in futureExecutionContext which is a cached
- // thread pool
- replicate(blockId, bufferView, putLevel)
- }(futureExecutionContext)
- case _ => null
- }
-
- var blockWasSuccessfullyStored = false
- var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
-
- putBlockInfo.synchronized {
- logTrace("Put for block %s took %s to get into synchronized block"
- .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
- try {
- if (putLevel.useMemory) {
- // Put it in memory first, even if it also has useDisk set to true;
- // We will drop it to disk later if the memory store can't hold it.
- data match {
- case IteratorValues(iterator) =>
- memoryStore.putIterator(blockId, iterator(), putLevel) match {
- case Right(s) =>
- size = s
- case Left(iter) =>
- iteratorFromFailedMemoryStorePut = Some(iter)
- }
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- size = bytes.limit()
- memoryStore.putBytes(blockId, bytes, putLevel)
- }
- } else if (putLevel.useDisk) {
- data match {
- case IteratorValues(iterator) =>
- diskStore.putIterator(blockId, iterator(), putLevel) match {
- case Right(s) =>
- size = s
- // putIterator() will never return Left (see its return type).
- }
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- size = bytes.limit()
- diskStore.putBytes(blockId, bytes, putLevel)
- }
+ var blockWasSuccessfullyStored: Boolean = false
+ val result: Option[T] = try {
+ val res = putBody(putBlockInfo)
+ blockWasSuccessfullyStored = res.isEmpty
+ res
+ } finally {
+ if (blockWasSuccessfullyStored) {
+ if (keepReadLock) {
+ blockInfoManager.downgradeLock(blockId)
} else {
- assert(putLevel == StorageLevel.NONE)
- throw new BlockException(
- blockId, s"Attempted to put block $blockId without specifying storage level!")
+ blockInfoManager.unlock(blockId)
}
-
- val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
- blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
- if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory, externalBlockStore, or disk store,
- // let other threads read it, and tell the master about it.
- putBlockInfo.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
- }
- }
- } finally {
- if (blockWasSuccessfullyStored) {
- if (keepReadLock) {
- blockInfoManager.downgradeLock(blockId)
- } else {
- blockInfoManager.unlock(blockId)
- }
- } else {
- blockInfoManager.removeBlock(blockId)
- logWarning(s"Putting block $blockId failed")
- }
- }
- }
- logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
- if (replicationFuture != null) {
- // Wait for asynchronous replication to finish
- Await.ready(replicationFuture, Duration.Inf)
- } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
- val remoteStartTime = System.currentTimeMillis
- val bytesToReplicate: ByteBuffer = {
- doGetLocal(blockId, putBlockInfo, asBlockResult = false)
- .map(_.asInstanceOf[ByteBuffer])
- .getOrElse {
- throw new SparkException(s"Block $blockId was not found even though it was just stored")
- }
- }
- try {
- replicate(blockId, bytesToReplicate, putLevel)
- } finally {
- BlockManager.dispose(bytesToReplicate)
+ } else {
+ blockInfoManager.removeBlock(blockId)
+ logWarning(s"Putting block $blockId failed")
}
- logDebug("Put block %s remotely took %s"
- .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
-
- if (putLevel.replication > 1) {
+ if (level.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
logDebug("Putting block %s without replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
+ result
+ }
- if (blockWasSuccessfullyStored) {
- DoPutSucceeded
- } else if (iteratorFromFailedMemoryStorePut.isDefined) {
- DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get)
- } else {
- DoPutBytesFailed
+ /**
+ * Put the given block according to the given level in one of the block stores, replicating
+ * the values if necessary.
+ *
+ * If the block already exists, this method will not overwrite it.
+ *
+ * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
+ * block already exists). If false, this method will hold no locks when it
+ * returns.
+ * @return None if the block was already present or if the put succeeded, or Some(iterator)
+ * if the put failed.
+ */
+ private def doPutIterator(
+ blockId: BlockId,
+ iterator: () => Iterator[Any],
+ level: StorageLevel,
+ tellMaster: Boolean = true,
+ keepReadLock: Boolean = false): Option[Iterator[Any]] = {
+ doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+ val startTimeMs = System.currentTimeMillis
+ var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
+ // Size of the block in bytes
+ var size = 0L
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ memoryStore.putIterator(blockId, iterator(), level) match {
+ case Right(s) =>
+ size = s
+ case Left(iter) =>
+ // Not enough space to unroll this block; drop to disk if applicable
+ if (level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ diskStore.put(blockId) { fileOutputStream =>
+ dataSerializeStream(blockId, fileOutputStream, iter)
+ }
+ size = diskStore.getSize(blockId)
+ } else {
+ iteratorFromFailedMemoryStorePut = Some(iter)
+ }
+ }
+ } else if (level.useDisk) {
+ diskStore.put(blockId) { fileOutputStream =>
+ dataSerializeStream(blockId, fileOutputStream, iterator())
+ }
+ size = diskStore.getSize(blockId)
+ }
+
+ val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+ if (blockWasSuccessfullyStored) {
+ // Now that the block is in either the memory, externalBlockStore, or disk store,
+ // tell the master about it.
+ putBlockInfo.size = size
+ if (tellMaster) {
+ reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+ }
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+ }
+ logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+ if (level.replication > 1) {
+ val remoteStartTime = System.currentTimeMillis
+ val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo)
+ try {
+ replicate(blockId, bytesToReplicate, level)
+ } finally {
+ BlockManager.dispose(bytesToReplicate)
+ }
+ logDebug("Put block %s remotely took %s"
+ .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+ }
+ }
+ assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
+ iteratorFromFailedMemoryStorePut
}
}
@@ -1077,9 +1096,11 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
- diskStore.putIterator(blockId, elements.toIterator, level)
+ diskStore.put(blockId) { fileOutputStream =>
+ dataSerializeStream(blockId, fileOutputStream, elements.toIterator)
+ }
case Right(bytes) =>
- diskStore.putBytes(blockId, bytes, level)
+ diskStore.putBytes(blockId, bytes)
}
blockIsUpdated = true
}
@@ -1229,7 +1250,6 @@ private[spark] class BlockManager(
rpcEnv.stop(slaveEndpoint)
blockInfoManager.clear()
memoryStore.clear()
- diskStore.clear()
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
deleted file mode 100644
index b069918b16..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.Logging
-
-/**
- * Abstract class to store blocks.
- */
-private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
-
- def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit
-
- /**
- * Attempt to store an iterator of values.
- *
- * @return an iterator of values (in case the put failed), or the estimated size of the stored
- * values if the put succeeded.
- */
- def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Either[Iterator[Any], Long]
-
- /**
- * Return the size of a block in bytes.
- */
- def getSize(blockId: BlockId): Long
-
- def getBytes(blockId: BlockId): Option[ByteBuffer]
-
- def getValues(blockId: BlockId): Option[Iterator[Any]]
-
- /**
- * Remove a block, if it exists.
- *
- * @param blockId the block to remove.
- * @return True if the block was found and removed, False otherwise.
- * @throws IllegalStateException if the block is pinned by a task.
- */
- def remove(blockId: BlockId): Boolean
-
- def contains(blockId: BlockId): Boolean
-
- def clear() { }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 4daf22f714..e51d96e57b 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -26,18 +26,14 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
- * locations. By default, one block is mapped to one file with a name given by its BlockId.
- * However, it is also possible to have a block map to only a segment of a file, by calling
- * mapBlockToFileSegment().
+ * locations. One block is mapped to one file with a name given by its BlockId.
*
* Block files are hashed among the directories listed in spark.local.dir (or in
* SPARK_LOCAL_DIRS, if it's set).
*/
-private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf)
- extends Logging {
+private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
- private[spark]
- val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
+ private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
@@ -163,10 +159,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
private def doStop(): Unit = {
- // Only perform cleanup if an external service is not serving our shuffle files.
- // Also blockManagerId could be null if block manager is not initialized properly.
- if (!blockManager.externalShuffleServiceEnabled ||
- (blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
+ if (deleteFilesOnStop) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index e35aa1b068..caecd97a0b 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,112 +17,100 @@
package org.apache.spark.storage
-import java.io.{File, FileOutputStream, IOException, RandomAccessFile}
+import java.io.{FileOutputStream, IOException, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
-import org.apache.spark.Logging
+import com.google.common.io.Closeables
+
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Utils
/**
* Stores BlockManager blocks on disk.
*/
-private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
- extends BlockStore(blockManager) with Logging {
+private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
- val minMemoryMapBytes = blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+ private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
- override def getSize(blockId: BlockId): Long = {
+ def getSize(blockId: BlockId): Long = {
diskManager.getFile(blockId.name).length
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
- // So that we do not modify the input offsets !
- // duplicate does not copy buffer, so inexpensive
- val bytes = _bytes.duplicate()
+ /**
+ * Invokes the provided callback function to write the specific block.
+ *
+ * @throws IllegalStateException if the block already exists in the disk store.
+ */
+ def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+ if (contains(blockId)) {
+ throw new IllegalStateException(s"Block $blockId is already present in the disk store")
+ }
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
- val channel = new FileOutputStream(file).getChannel
- Utils.tryWithSafeFinally {
- while (bytes.remaining > 0) {
- channel.write(bytes)
+ val fileOutputStream = new FileOutputStream(file)
+ var threwException: Boolean = true
+ try {
+ writeFunc(fileOutputStream)
+ threwException = false
+ } finally {
+ try {
+ Closeables.close(fileOutputStream, threwException)
+ } finally {
+ if (threwException) {
+ remove(blockId)
+ }
}
- } {
- channel.close()
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
- file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
+ file.getName,
+ Utils.bytesToString(file.length()),
+ finishTime - startTime))
}
- override def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Right[Iterator[Any], Long] = {
- logDebug(s"Attempting to write values for block $blockId")
- val startTime = System.currentTimeMillis
- val file = diskManager.getFile(blockId)
- val outputStream = new FileOutputStream(file)
- try {
+ def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = {
+ // So that we do not modify the input offsets !
+ // duplicate does not copy buffer, so inexpensive
+ val bytes = _bytes.duplicate()
+ put(blockId) { fileOutputStream =>
+ val channel = fileOutputStream.getChannel
Utils.tryWithSafeFinally {
- blockManager.dataSerializeStream(blockId, outputStream, values)
+ while (bytes.remaining > 0) {
+ channel.write(bytes)
+ }
} {
- // Close outputStream here because it should be closed before file is deleted.
- outputStream.close()
+ channel.close()
}
- } catch {
- case e: Throwable =>
- if (file.exists()) {
- if (!file.delete()) {
- logWarning(s"Error deleting ${file}")
- }
- }
- throw e
}
-
- val length = file.length
-
- val timeTaken = System.currentTimeMillis - startTime
- logDebug("Block %s stored as %s file on disk in %d ms".format(
- file.getName, Utils.bytesToString(length), timeTaken))
-
- Right(length)
}
- private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
+ def getBytes(blockId: BlockId): ByteBuffer = {
+ val file = diskManager.getFile(blockId.name)
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
- if (length < minMemoryMapBytes) {
- val buf = ByteBuffer.allocate(length.toInt)
- channel.position(offset)
+ if (file.length < minMemoryMapBytes) {
+ val buf = ByteBuffer.allocate(file.length.toInt)
+ channel.position(0)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
- s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+ s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
- Some(buf)
+ buf
} else {
- Some(channel.map(MapMode.READ_ONLY, offset, length))
+ channel.map(MapMode.READ_ONLY, 0, file.length)
}
} {
channel.close()
}
}
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- val file = diskManager.getFile(blockId.name)
- getBytes(file, 0, file.length)
- }
-
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
- }
-
- override def remove(blockId: BlockId): Boolean = {
+ def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
if (file.exists()) {
val ret = file.delete()
@@ -135,7 +123,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
}
}
- override def contains(blockId: BlockId): Boolean = {
+ def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
file.exists()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index bb72fe4bca..a80b2357ff 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.storage
+package org.apache.spark.storage.memory
import java.nio.ByteBuffer
import java.util.LinkedHashMap
@@ -23,8 +23,9 @@ import java.util.LinkedHashMap
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.TaskContext
+import org.apache.spark.{Logging, SparkConf, TaskContext}
import org.apache.spark.memory.MemoryManager
+import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel}
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
@@ -34,13 +35,15 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
* Stores blocks in memory, either as Arrays of deserialized Java objects or as
* serialized ByteBuffers.
*/
-private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager)
- extends BlockStore(blockManager) {
+private[spark] class MemoryStore(
+ conf: SparkConf,
+ blockManager: BlockManager,
+ memoryManager: MemoryManager)
+ extends Logging {
// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
- private val conf = blockManager.conf
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
@@ -81,32 +84,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
memoryUsed - currentUnrollMemory
}
- override def getSize(blockId: BlockId): Long = {
+ def getSize(blockId: BlockId): Long = {
entries.synchronized {
entries.get(blockId).size
}
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
- require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- // Work on a duplicate - since the original input might be used elsewhere.
- val bytes = _bytes.duplicate()
- bytes.rewind()
- if (level.deserialized) {
- val values = blockManager.dataDeserialize(blockId, bytes)
- putIterator(blockId, values, level)
- } else {
- tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- }
- }
-
/**
* Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
* put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
*
* The caller should guarantee that `size` is correct.
+ *
+ * @return true if the put() succeeded, false otherwise.
*/
- def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
+ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
@@ -114,89 +106,70 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
if (putSuccess) {
assert(bytes.limit == size)
}
- }
-
- override def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Either[Iterator[Any], Long] = {
- require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- putIterator(blockId, values, level, allowPersistToDisk = true)
+ putSuccess
}
/**
* Attempt to put the given block in memory store.
*
- * There may not be enough space to fully unroll the iterator in memory, in which case we
- * optionally drop the values to disk if
- * (1) the block's storage level specifies useDisk, and
- * (2) `allowPersistToDisk` is true.
- *
- * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
- * back from disk and attempts to cache it in memory. In this case, we should not persist the
- * block back on disk again, as it is already in disk store.
+ * @return the estimated size of the stored data if the put() succeeded, or an iterator
+ * in case the put() failed (the returned iterator lets callers fall back to the disk
+ * store if desired).
*/
private[storage] def putIterator(
blockId: BlockId,
values: Iterator[Any],
- level: StorageLevel,
- allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
+ level: StorageLevel): Either[Iterator[Any], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
val unrolledValues = unrollSafely(blockId, values)
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
- val size = {
- if (level.deserialized) {
- val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
- tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
- sizeEstimate
+ if (level.deserialized) {
+ val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
+ if (tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)) {
+ Right(sizeEstimate)
} else {
- val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
- tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- bytes.limit()
+ Left(arrayValues.toIterator)
}
- }
- Right(size)
- case Right(iteratorValues) =>
- // Not enough space to unroll this block; drop to disk if applicable
- if (level.useDisk && allowPersistToDisk) {
- logWarning(s"Persisting block $blockId to disk instead.")
- blockManager.diskStore.putIterator(blockId, iteratorValues, level)
} else {
- Left(iteratorValues)
+ val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
+ if (tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)) {
+ Right(bytes.limit())
+ } else {
+ Left(arrayValues.toIterator)
+ }
}
+ case Right(iteratorValues) =>
+ Left(iteratorValues)
}
}
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+ def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
None
- } else if (entry.deserialized) {
- Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
} else {
+ require(!entry.deserialized, "should only call getBytes on blocks stored in serialized form")
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
}
}
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+ def getValues(blockId: BlockId): Option[Iterator[Any]] = {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
None
- } else if (entry.deserialized) {
- Some(entry.value.asInstanceOf[Array[Any]].iterator)
} else {
- val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
- Some(blockManager.dataDeserialize(blockId, buffer))
+ require(entry.deserialized, "should only call getValues on deserialized blocks")
+ Some(entry.value.asInstanceOf[Array[Any]].iterator)
}
}
- override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
+ def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
val entry = entries.synchronized {
entries.remove(blockId)
}
@@ -210,7 +183,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def clear(): Unit = memoryManager.synchronized {
+ def clear(): Unit = memoryManager.synchronized {
entries.synchronized {
entries.clear()
}
@@ -323,14 +296,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
* must also be passed by the caller.
*
- * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
- * created to avoid OOM since it may be a big ByteBuffer.
- *
- * Synchronize on `memoryManager` to ensure that all the put requests and its associated block
- * dropping is done by only on thread at a time. Otherwise while one thread is dropping
- * blocks to free memory for one block, another thread may use up the freed space for
- * another block.
- *
* @return whether put was successful.
*/
private def tryToPut(
@@ -338,42 +303,33 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
value: () => Any,
size: Long,
deserialized: Boolean): Boolean = {
+ val acquiredEnoughStorageMemory = {
+ // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another
+ // task.
+ memoryManager.synchronized {
+ // Note: if we have previously unrolled this block successfully, then pending unroll
+ // memory should be non-zero. This is the amount that we already reserved during the
+ // unrolling process. In this case, we can just reuse this space to cache our block.
+ // The synchronization on `memoryManager` here guarantees that the release and acquire
+ // happen atomically. This relies on the assumption that all memory acquisitions are
+ // synchronized on the same lock.
+ releasePendingUnrollMemoryForThisTask()
+ memoryManager.acquireStorageMemory(blockId, size)
+ }
+ }
- /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
- * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
- * been released, it must be ensured that those to-be-dropped blocks are not double counted
- * for freeing up more space for another block that needs to be put. Only then the actually
- * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
-
- memoryManager.synchronized {
- // Note: if we have previously unrolled this block successfully, then pending unroll
- // memory should be non-zero. This is the amount that we already reserved during the
- // unrolling process. In this case, we can just reuse this space to cache our block.
- // The synchronization on `memoryManager` here guarantees that the release and acquire
- // happen atomically. This relies on the assumption that all memory acquisitions are
- // synchronized on the same lock.
- releasePendingUnrollMemoryForThisTask()
- val enoughMemory = memoryManager.acquireStorageMemory(blockId, size)
- if (enoughMemory) {
- // We acquired enough memory for the block, so go ahead and put it
- val entry = new MemoryEntry(value(), size, deserialized)
- entries.synchronized {
- entries.put(blockId, entry)
- }
- val valuesOrBytes = if (deserialized) "values" else "bytes"
- logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
- blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
- } else {
- // Tell the block manager that we couldn't put it in memory so that it can drop it to
- // disk if the block allows disk storage.
- lazy val data = if (deserialized) {
- Left(value().asInstanceOf[Array[Any]])
- } else {
- Right(value().asInstanceOf[ByteBuffer].duplicate())
- }
- blockManager.dropFromMemory(blockId, () => data)
+ if (acquiredEnoughStorageMemory) {
+ // We acquired enough memory for the block, so go ahead and put it
+ val entry = new MemoryEntry(value(), size, deserialized)
+ entries.synchronized {
+ entries.put(blockId, entry)
}
- enoughMemory
+ val valuesOrBytes = if (deserialized) "values" else "bytes"
+ logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
+ blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
+ true
+ } else {
+ false
}
}
@@ -455,7 +411,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def contains(blockId: BlockId): Boolean = {
+ def contains(blockId: BlockId): Boolean = {
entries.synchronized { entries.containsKey(blockId) }
}
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 88fdbbdaec..f97cfbba32 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -37,7 +37,7 @@ class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable {
rdd.map { x =>
val bm = SparkEnv.get.blockManager
// Check if broadcast block was fetched
- val isFound = bm.getLocal(BroadcastBlockId(bid)).isDefined
+ val isFound = bm.getLocalValues(BroadcastBlockId(bid)).isDefined
(x, isFound)
}.collect().toSet
}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index d9764c7c10..686e948b5d 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -31,7 +31,8 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel}
+import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel}
+import org.apache.spark.storage.memory.MemoryStore
/**
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index eee78d396e..741d4fdf78 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
import org.mockito.Mockito.when
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{MemoryStore, TestBlockId}
+import org.apache.spark.storage.TestBlockId
+import org.apache.spark.storage.memory.MemoryStore
class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 9686c6621b..9001a26652 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{MemoryStore, TestBlockId}
+import org.apache.spark.storage.TestBlockId
+import org.apache.spark.storage.memory.MemoryStore
class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
private val dummyBlock = TestBlockId("--")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index ae1faf5d98..b78a3648cd 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -366,7 +366,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
testStore => blockLocations.contains(testStore.blockManagerId.executorId)
}.foreach { testStore =>
val testStoreName = testStore.blockManagerId.executorId
- assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+ assert(
+ testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
testStore.releaseLock(blockId)
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0485b0501c..42595c8cf2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,8 +17,7 @@
package org.apache.spark.storage
-import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.Arrays
+import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
@@ -614,11 +613,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putSingle("a3", a3, storageLevel)
assert(accessMethod("a2").isDefined, "a2 was not in store")
assert(accessMethod("a3").isDefined, "a3 was not in store")
- assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
assert(accessMethod("a1").isDefined, "a1 was not in store")
val dataShouldHaveBeenCachedBackIntoMemory = {
- if (storageLevel.deserialized) !getAsBytes
- else getAsBytes
+ if (storageLevel.deserialized) {
+ !getAsBytes
+ } else {
+ // If the block's storage level is serialized, then always cache the bytes in memory, even
+ // if the caller requested values.
+ true
+ }
}
if (dataShouldHaveBeenCachedBackIntoMemory) {
assert(store.memoryStore.contains("a1"), "a1 was not in memory store")
@@ -735,7 +738,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
- assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
+ assert(!store.memoryStore.contains("a2"), "a2 was in memory store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
}
@@ -829,50 +832,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}
- test("reads of memory-mapped and non memory-mapped files are equivalent") {
- val confKey = "spark.storage.memoryMapThreshold"
-
- // Create a non-trivial (not all zeros) byte array
- var counter = 0.toByte
- def incr: Byte = {counter = (counter + 1).toByte; counter;}
- val bytes = Array.fill[Byte](1000)(incr)
- val byteBuffer = ByteBuffer.wrap(bytes)
-
- val blockId = BlockId("rdd_1_2")
-
- // This sequence of mocks makes these tests fairly brittle. It would
- // be nice to refactor classes involved in disk storage in a way that
- // allows for easier testing.
- val blockManager = mock(classOf[BlockManager])
- when(blockManager.conf).thenReturn(conf.clone.set(confKey, "0"))
- val diskBlockManager = new DiskBlockManager(blockManager, conf)
-
- val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
- diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
- val mapped = diskStoreMapped.getBytes(blockId).get
-
- when(blockManager.conf).thenReturn(conf.clone.set(confKey, "1m"))
- val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
- diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
- val notMapped = diskStoreNotMapped.getBytes(blockId).get
-
- // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
- assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
- "Expected HeapByteBuffer for un-mapped read")
- assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
-
- def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
- val array = new Array[Byte](in.remaining())
- in.get(array)
- array
- }
-
- val mappedAsArray = arrayFromByteBuffer(mapped)
- val notMappedAsArray = arrayFromByteBuffer(notMapped)
- assert(Arrays.equals(mappedAsArray, bytes))
- assert(Arrays.equals(notMappedAsArray, bytes))
- }
-
test("updated block statuses") {
store = makeBlockManager(12000)
store.registerTask(0)
@@ -1232,19 +1191,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- // Unroll huge block with not enough space. This should fail and drop the new block to disk
- // directly in addition to kicking out b2 in the process. Memory store should contain only
- // b3, while disk store should contain b1, b2 and b4.
+ // Unroll huge block with not enough space. This should fail and return an iterator so that
+ // the block may be stored to disk. During the unrolling process, block "b2" should be kicked
+ // out, so the memory store should contain only b3, while the disk store should contain
+ // b1, b2 and b4.
val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
- assert(result4.isRight)
+ assert(result4.isLeft)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
assert(!memoryStore.contains("b4"))
- assert(diskStore.contains("b1"))
- assert(diskStore.contains("b2"))
- assert(!diskStore.contains("b3"))
- assert(diskStore.contains("b4"))
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
}
@@ -1366,7 +1322,7 @@ private object BlockManagerSuite {
getLocalAndReleaseLock(blockId).isDefined
}
- val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocal)
+ val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues)
val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 69e17461df..bbfd6df3b6 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -21,7 +21,6 @@ import java.io.{File, FileWriter}
import scala.language.reflectiveCalls
-import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -33,8 +32,6 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
private var rootDir1: File = _
private var rootDirs: String = _
- val blockManager = mock(classOf[BlockManager])
- when(blockManager.conf).thenReturn(testConf)
var diskBlockManager: DiskBlockManager = _
override def beforeAll() {
@@ -57,7 +54,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
super.beforeEach()
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
- diskBlockManager = new DiskBlockManager(blockManager, conf)
+ diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
}
override def afterEach() {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
new file mode 100644
index 0000000000..97e74fe706
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.Arrays
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class DiskStoreSuite extends SparkFunSuite {
+
+ test("reads of memory-mapped and non memory-mapped files are equivalent") {
+ val confKey = "spark.storage.memoryMapThreshold"
+
+ // Create a non-trivial (not all zeros) byte array
+ val bytes = Array.tabulate[Byte](1000)(_.toByte)
+ val byteBuffer = ByteBuffer.wrap(bytes)
+
+ val blockId = BlockId("rdd_1_2")
+ val diskBlockManager = new DiskBlockManager(new SparkConf(), deleteFilesOnStop = true)
+
+ val diskStoreMapped = new DiskStore(new SparkConf().set(confKey, "0"), diskBlockManager)
+ diskStoreMapped.putBytes(blockId, byteBuffer)
+ val mapped = diskStoreMapped.getBytes(blockId)
+ assert(diskStoreMapped.remove(blockId))
+
+ val diskStoreNotMapped = new DiskStore(new SparkConf().set(confKey, "1m"), diskBlockManager)
+ diskStoreNotMapped.putBytes(blockId, byteBuffer)
+ val notMapped = diskStoreNotMapped.getBytes(blockId)
+
+ // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
+ assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
+ "Expected HeapByteBuffer for un-mapped read")
+ assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
+
+ def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
+ val array = new Array[Byte](in.remaining())
+ in.get(array)
+ array
+ }
+
+ val mappedAsArray = arrayFromByteBuffer(mapped)
+ val notMappedAsArray = arrayFromByteBuffer(notMapped)
+ assert(Arrays.equals(mappedAsArray, bytes))
+ assert(Arrays.equals(notMappedAsArray, bytes))
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index dd16fc3eca..45424f9bac 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -106,7 +106,10 @@ class ReceivedBlockHandlerSuite
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
// Verify the data in block manager is correct
val storedData = blockIds.flatMap { blockId =>
- blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
+ blockManager
+ .getLocalValues(blockId)
+ .map(_.data.map(_.toString).toList)
+ .getOrElse(List.empty)
}.toList
storedData shouldEqual data
@@ -130,7 +133,10 @@ class ReceivedBlockHandlerSuite
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
// Verify the data in block manager is correct
val storedData = blockIds.flatMap { blockId =>
- blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
+ blockManager
+ .getLocalValues(blockId)
+ .map(_.data.map(_.toString).toList)
+ .getOrElse(List.empty)
}.toList
storedData shouldEqual data