aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-10 15:08:41 -0800
committerAndrew Or <andrew@databricks.com>2016-03-10 15:08:41 -0800
commit81d48532d954a8aea28d7e1fb3aa32a78c708b63 (patch)
treec53bc6199e1807cabfea0a8dac87cb0c7ebbe6e3 /core
parent3d2b6f56e38ce867ae8819752fd693adab9a8cc9 (diff)
downloadspark-81d48532d954a8aea28d7e1fb3aa32a78c708b63.tar.gz
spark-81d48532d954a8aea28d7e1fb3aa32a78c708b63.tar.bz2
spark-81d48532d954a8aea28d7e1fb3aa32a78c708b63.zip
[SPARK-13696] Remove BlockStore class & simplify interfaces of mem. & disk stores
Today, both the MemoryStore and DiskStore implement a common `BlockStore` API, but I feel that this API is inappropriate because it abstracts away important distinctions between the behavior of these two stores. For instance, the disk store doesn't have a notion of storing deserialized objects, so it's confusing for it to expose object-based APIs like putIterator() and getValues() instead of only exposing binary APIs and pushing the responsibilities of serialization and deserialization to the client. Similarly, the DiskStore put() methods accepted a `StorageLevel` parameter even though the disk store can only store blocks in one form. As part of a larger BlockManager interface cleanup, this patch remove the BlockStore interface and refines the MemoryStore and DiskStore interfaces to reflect more narrow sets of responsibilities for those components. Some of the benefits of this interface cleanup are reflected in simplifications to several unit tests to eliminate now-unnecessary mocking, significant simplification of the BlockManager's `getLocal()` and `doPut()` methods, and a narrower API between the MemoryStore and DiskStore. Author: Josh Rosen <joshrosen@databricks.com> Closes #11534 from JoshRosen/remove-blockstore-interface.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala612
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStore.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala114
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala (renamed from core/src/main/scala/org/apache/spark/storage/MemoryStore.scala)170
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala74
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala62
16 files changed, 528 insertions, 611 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index dabc810018..550e1ba6d3 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -173,7 +173,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
- blockManager.getLocal(broadcastId).map(_.data.next()) match {
+ blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
case Some(x) =>
releaseLock(broadcastId)
x.asInstanceOf[T]
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index b5adbd88a2..e89b03e38b 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy
import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.MemoryAllocator
@@ -113,6 +114,7 @@ private[spark] abstract class MemoryManager(
/**
* Release all memory for the given task and mark it as inactive (e.g. when a task ends).
+ *
* @return the number of bytes freed.
*/
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 6a88966f60..1d376adf1a 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy
import org.apache.spark.Logging
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
/**
* Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
@@ -55,6 +56,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ *
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b38e2ec57f..873330e136 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,24 +40,15 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
+import org.apache.spark.storage.memory._
import org.apache.spark.util._
-private[spark] sealed trait BlockValues
-private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
-private[spark] case class IteratorValues(iterator: () => Iterator[Any]) extends BlockValues
-
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
val readMethod: DataReadMethod.Value,
val bytes: Long)
-// Class for representing return value of doPut()
-private sealed trait DoPutResult
-private case object DoPutSucceeded extends DoPutResult
-private case object DoPutBytesFailed extends DoPutResult
-private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult
-
/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
@@ -78,7 +69,15 @@ private[spark] class BlockManager(
numUsableCores: Int)
extends BlockDataManager with Logging {
- val diskBlockManager = new DiskBlockManager(this, conf)
+ private[spark] val externalShuffleServiceEnabled =
+ conf.getBoolean("spark.shuffle.service.enabled", false)
+
+ val diskBlockManager = {
+ // Only perform cleanup if an external service is not serving our shuffle files.
+ val deleteFilesOnStop =
+ !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
+ new DiskBlockManager(conf, deleteFilesOnStop)
+ }
private[storage] val blockInfoManager = new BlockInfoManager
@@ -86,8 +85,8 @@ private[spark] class BlockManager(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
// Actual storage of where blocks are kept
- private[spark] val memoryStore = new MemoryStore(this, memoryManager)
- private[spark] val diskStore = new DiskStore(this, diskBlockManager)
+ private[spark] val memoryStore = new MemoryStore(conf, this, memoryManager)
+ private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
memoryManager.setMemoryStore(memoryStore)
// Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
@@ -96,9 +95,6 @@ private[spark] class BlockManager(
// to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxMemory = memoryManager.maxStorageMemory
- private[spark]
- val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
-
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
private val externalShuffleServicePort = {
@@ -285,13 +281,9 @@ private[spark] class BlockManager(
if (blockId.isShuffle) {
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
- val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
- .asInstanceOf[Option[ByteBuffer]]
- if (blockBytesOpt.isDefined) {
- val buffer = blockBytesOpt.get
- new BlockManagerManagedBuffer(this, blockId, buffer)
- } else {
- throw new BlockNotFoundException(blockId.toString)
+ getLocalBytes(blockId) match {
+ case Some(buffer) => new BlockManagerManagedBuffer(this, blockId, buffer)
+ case None => throw new BlockNotFoundException(blockId.toString)
}
}
}
@@ -407,11 +399,71 @@ private[spark] class BlockManager(
}
/**
- * Get block from local block manager.
+ * Get block from local block manager as an iterator of Java objects.
*/
- def getLocal(blockId: BlockId): Option[BlockResult] = {
+ def getLocalValues(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
- doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
+ blockInfoManager.lockForReading(blockId) match {
+ case None =>
+ logDebug(s"Block $blockId was not found")
+ None
+ case Some(info) =>
+ val level = info.level
+ logDebug(s"Level for block $blockId is $level")
+ if (level.useMemory && memoryStore.contains(blockId)) {
+ val iter: Iterator[Any] = if (level.deserialized) {
+ memoryStore.getValues(blockId).get
+ } else {
+ dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+ }
+ val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+ Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
+ } else if (level.useDisk && diskStore.contains(blockId)) {
+ val iterToReturn: Iterator[Any] = {
+ val diskBytes = diskStore.getBytes(blockId)
+ if (level.deserialized) {
+ val diskIterator = dataDeserialize(blockId, diskBytes)
+ if (level.useMemory) {
+ // Cache the values before returning them
+ memoryStore.putIterator(blockId, diskIterator, level) match {
+ case Left(iter) =>
+ // The memory store put() failed, so it returned the iterator back to us:
+ iter
+ case Right(_) =>
+ // The put() succeeded, so we can read the values back:
+ memoryStore.getValues(blockId).get
+ }
+ } else {
+ diskIterator
+ }
+ } else { // storage level is serialized
+ if (level.useMemory) {
+ // Cache the bytes back into memory to speed up subsequent reads.
+ val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
+ // https://issues.apache.org/jira/browse/SPARK-6076
+ // If the file size is bigger than the free memory, OOM will happen. So if we
+ // cannot put it into MemoryStore, copyForMemory should not be created. That's why
+ // this action is put into a `() => ByteBuffer` and created lazily.
+ val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
+ copyForMemory.put(diskBytes)
+ })
+ if (putSucceeded) {
+ dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+ } else {
+ dataDeserialize(blockId, diskBytes)
+ }
+ } else {
+ dataDeserialize(blockId, diskBytes)
+ }
+ }
+ }
+ val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
+ Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ } else {
+ releaseLock(blockId)
+ throw new SparkException(s"Block $blockId was not found even though it's read-locked")
+ }
+ }
}
/**
@@ -428,77 +480,44 @@ private[spark] class BlockManager(
Option(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
- doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
- }
- }
-
- private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
- blockInfoManager.lockForReading(blockId) match {
- case None =>
- logDebug(s"Block $blockId was not found")
- None
- case Some(info) =>
- doGetLocal(blockId, info, asBlockResult)
+ blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
}
}
/**
- * Get a local block from the block manager.
- * Assumes that the caller holds a read lock on the block.
+ * Get block from the local block manager as serialized bytes.
+ *
+ * Must be called while holding a read lock on the block.
+ * Releases the read lock upon exception; keeps the read lock upon successful return.
*/
- private def doGetLocal(
- blockId: BlockId,
- info: BlockInfo,
- asBlockResult: Boolean): Option[Any] = {
+ private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = {
val level = info.level
logDebug(s"Level for block $blockId is $level")
-
- // Look for the block in memory
- if (level.useMemory) {
- logDebug(s"Getting block $blockId from memory")
- val result = if (asBlockResult) {
- memoryStore.getValues(blockId).map { iter =>
- val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
- new BlockResult(ci, DataReadMethod.Memory, info.size)
- }
+ // In order, try to read the serialized bytes from memory, then from disk, then fall back to
+ // serializing in-memory objects, and, finally, throw an exception if the block does not exist.
+ if (level.deserialized) {
+ // Try to avoid expensive serialization by reading a pre-serialized copy from disk:
+ if (level.useDisk && diskStore.contains(blockId)) {
+ // Note: we purposely do not try to put the block back into memory here. Since this branch
+ // handles deserialized blocks, this block may only be cached in memory as objects, not
+ // serialized bytes. Because the caller only requested bytes, it doesn't make sense to
+ // cache the block's deserialized objects since that caching may not have a payoff.
+ diskStore.getBytes(blockId)
+ } else if (level.useMemory && memoryStore.contains(blockId)) {
+ // The block was not found on disk, so serialize an in-memory copy:
+ dataSerialize(blockId, memoryStore.getValues(blockId).get)
} else {
- memoryStore.getBytes(blockId)
- }
- result match {
- case Some(values) =>
- return result
- case None =>
- logDebug(s"Block $blockId not found in memory")
- }
- }
-
- // Look for block on disk, potentially storing it back in memory if required
- if (level.useDisk) {
- logDebug(s"Getting block $blockId from disk")
- val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
- case Some(b) => b
- case None =>
- releaseLock(blockId)
- throw new BlockException(
- blockId, s"Block $blockId not found on disk, though it should be")
+ releaseLock(blockId)
+ throw new SparkException(s"Block $blockId was not found even though it's read-locked")
}
- assert(0 == bytes.position())
-
- if (!level.useMemory) {
- // If the block shouldn't be stored in memory, we can just return it
- if (asBlockResult) {
- val iter = CompletionIterator[Any, Iterator[Any]](
- dataDeserialize(blockId, bytes), releaseLock(blockId))
- return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
- } else {
- return Some(bytes)
- }
- } else {
- // Otherwise, we also have to store something in the memory store
- if (!level.deserialized && !asBlockResult) {
- /* We'll store the bytes in memory if the block's storage level includes
- * "memory serialized" and we requested its serialized bytes. */
- memoryStore.putBytes(blockId, bytes.limit, () => {
+ } else { // storage level is serialized
+ if (level.useMemory && memoryStore.contains(blockId)) {
+ memoryStore.getBytes(blockId).get
+ } else if (level.useDisk && diskStore.contains(blockId)) {
+ val bytes = diskStore.getBytes(blockId)
+ if (level.useMemory) {
+ // Cache the bytes back into memory to speed up subsequent reads.
+ val memoryStorePutSucceeded = memoryStore.putBytes(blockId, bytes.limit(), () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
// put it into MemoryStore, copyForMemory should not be created. That's why this
@@ -506,39 +525,19 @@ private[spark] class BlockManager(
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
})
- bytes.rewind()
- }
- if (!asBlockResult) {
- return Some(bytes)
- } else {
- val values = dataDeserialize(blockId, bytes)
- val valuesToReturn: Iterator[Any] = {
- if (level.deserialized) {
- // Cache the values before returning them
- memoryStore.putIterator(blockId, values, level, allowPersistToDisk = false) match {
- case Left(iter) =>
- // The memory store put() failed, so it returned the iterator back to us:
- iter
- case Right(_) =>
- // The put() succeeded, so we can read the values back:
- memoryStore.getValues(blockId).get
- }
- } else {
- values
- }
+ if (memoryStorePutSucceeded) {
+ memoryStore.getBytes(blockId).get
+ } else {
+ bytes.rewind()
+ bytes
}
- val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, releaseLock(blockId))
- return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ } else {
+ bytes
}
+ } else {
+ releaseLock(blockId)
+ throw new SparkException(s"Block $blockId was not found even though it's read-locked")
}
- } else {
- // This branch represents a case where the BlockInfoManager contained an entry for
- // the block but the block could not be found in any of the block stores. This case
- // should never occur, but for completeness's sake we address it here.
- logError(
- s"Block $blockId is supposedly stored locally but was not found in any block store")
- releaseLock(blockId)
- None
}
}
@@ -547,17 +546,10 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
- def getRemote(blockId: BlockId): Option[BlockResult] = {
- logDebug(s"Getting remote block $blockId")
- doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
- }
-
- /**
- * Get block from remote block managers as serialized bytes.
- */
- def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
- logDebug(s"Getting remote block $blockId as bytes")
- doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
+ def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+ getRemoteBytes(blockId).map { data =>
+ new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())
+ }
}
/**
@@ -570,7 +562,11 @@ private[spark] class BlockManager(
preferredLocs ++ otherLocs
}
- private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
+ /**
+ * Get block from remote block managers as serialized bytes.
+ */
+ def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+ logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
val locations = getLocations(blockId)
var numFetchFailures = 0
@@ -595,14 +591,7 @@ private[spark] class BlockManager(
}
if (data != null) {
- if (asBlockResult) {
- return Some(new BlockResult(
- dataDeserialize(blockId, data),
- DataReadMethod.Network,
- data.limit()))
- } else {
- return Some(data)
- }
+ return Some(data)
}
logDebug(s"The value of block $blockId is null")
}
@@ -618,12 +607,12 @@ private[spark] class BlockManager(
* automatically be freed once the result's `data` iterator is fully consumed.
*/
def get(blockId: BlockId): Option[BlockResult] = {
- val local = getLocal(blockId)
+ val local = getLocalValues(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
- val remote = getRemote(blockId)
+ val remote = getRemoteValues(blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
@@ -673,24 +662,26 @@ private[spark] class BlockManager(
level: StorageLevel,
makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
// Initially we hold no locks on this block.
- doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
- case DoPutSucceeded =>
+ doPutIterator(blockId, makeIterator, level, keepReadLock = true) match {
+ case None =>
// doPut() didn't hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
- val blockResult = get(blockId).getOrElse {
+ val blockResult = getLocalValues(blockId).getOrElse {
// Since we held a read lock between the doPut() and get() calls, the block should not
// have been evicted, so get() not returning the block indicates some internal error.
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
+ // We already hold a read lock on the block from the doPut() call and getLocalValues()
+ // acquires the lock again, so we need to call releaseLock() here so that the net number
+ // of lock acquisitions is 1 (since the caller will only call release() once).
+ releaseLock(blockId)
Left(blockResult)
- case DoPutIteratorFailed(iter) =>
+ case Some(iter) =>
// The put failed, likely because the data was too large to fit in memory and could not be
// dropped to disk. Therefore, we need to pass the input iterator back to the caller so
// that they can decide what to do with the values (e.g. process them without caching).
Right(iter)
- case DoPutBytesFailed =>
- throw new SparkException("doPut returned an invalid failure response")
}
}
@@ -701,16 +692,10 @@ private[spark] class BlockManager(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
- tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+ tellMaster: Boolean = true): Boolean = {
require(values != null, "Values is null")
- val result = doPut(
- blockId,
- IteratorValues(() => values),
- level,
- tellMaster,
- effectiveStorageLevel)
- result == DoPutSucceeded
+ // If doPut() didn't hand work back to us, then block already existed or was successfully stored
+ doPutIterator(blockId, () => values, level, tellMaster).isEmpty
}
/**
@@ -739,46 +724,105 @@ private[spark] class BlockManager(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
- tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+ tellMaster: Boolean = true): Boolean = {
require(bytes != null, "Bytes is null")
- val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
- result == DoPutSucceeded
+ doPutBytes(blockId, bytes, level, tellMaster)
}
/**
- * Put the given block according to the given level in one of the block stores, replicating
+ * Put the given bytes according to the given level in one of the block stores, replicating
* the values if necessary.
*
* If the block already exists, this method will not overwrite it.
*
- * @param effectiveStorageLevel the level according to which the block will actually be handled.
- * This allows the caller to specify an alternate behavior of doPut
- * while preserving the original level specified by the user.
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
- * @return [[DoPutSucceeded]] if the block was already present or if the put succeeded, or
- * [[DoPutBytesFailed]] if the put failed and we were storing bytes, or
- * [[DoPutIteratorFailed]] if the put failed and we were storing an iterator.
+ * @return true if the block was already present or if the put succeeded, false otherwise.
*/
- private def doPut(
+ private def doPutBytes(
blockId: BlockId,
- data: BlockValues,
+ bytes: ByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None,
- keepReadLock: Boolean = false): DoPutResult = {
+ keepReadLock: Boolean = false): Boolean = {
+ doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+ val startTimeMs = System.currentTimeMillis
+ // Since we're storing bytes, initiate the replication before storing them locally.
+ // This is faster as data is already serialized and ready to send.
+ val replicationFuture = if (level.replication > 1) {
+ // Duplicate doesn't copy the bytes, but just creates a wrapper
+ val bufferView = bytes.duplicate()
+ Future {
+ // This is a blocking action and should run in futureExecutionContext which is a cached
+ // thread pool
+ replicate(blockId, bufferView, level)
+ }(futureExecutionContext)
+ } else {
+ null
+ }
+
+ bytes.rewind()
+ val size = bytes.limit()
+
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ val putSucceeded = if (level.deserialized) {
+ val values = dataDeserialize(blockId, bytes.duplicate())
+ memoryStore.putIterator(blockId, values, level).isRight
+ } else {
+ memoryStore.putBytes(blockId, size, () => bytes)
+ }
+ if (!putSucceeded && level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ diskStore.putBytes(blockId, bytes)
+ }
+ } else if (level.useDisk) {
+ diskStore.putBytes(blockId, bytes)
+ }
+
+ val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+ if (blockWasSuccessfullyStored) {
+ // Now that the block is in either the memory, externalBlockStore, or disk store,
+ // tell the master about it.
+ putBlockInfo.size = size
+ if (tellMaster) {
+ reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+ }
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+ }
+ }
+ logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+ if (level.replication > 1) {
+ // Wait for asynchronous replication to finish
+ Await.ready(replicationFuture, Duration.Inf)
+ }
+ if (blockWasSuccessfullyStored) {
+ None
+ } else {
+ Some(bytes)
+ }
+ }.isEmpty
+ }
+
+ /**
+ * Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]].
+ *
+ * @param putBody a function which attempts the actual put() and returns None on success
+ * or Some on failure.
+ */
+ private def doPut[T](
+ blockId: BlockId,
+ level: StorageLevel,
+ tellMaster: Boolean,
+ keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
- effectiveStorageLevel.foreach { level =>
- require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
- }
- /* Remember the block's storage level so that we can correctly drop it to disk if it needs
- * to be dropped right after it got put into memory. Note, however, that other threads will
- * not be able to get() this block until we call markReady on its BlockInfo. */
val putBlockInfo = {
val newInfo = new BlockInfo(level, tellMaster)
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
@@ -789,138 +833,113 @@ private[spark] class BlockManager(
// lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
releaseLock(blockId)
}
- return DoPutSucceeded
+ return None
}
}
val startTimeMs = System.currentTimeMillis
-
- // Size of the block in bytes
- var size = 0L
-
- // The level we actually use to put the block
- val putLevel = effectiveStorageLevel.getOrElse(level)
-
- // If we're storing bytes, then initiate the replication before storing them locally.
- // This is faster as data is already serialized and ready to send.
- val replicationFuture = data match {
- case b: ByteBufferValues if putLevel.replication > 1 =>
- // Duplicate doesn't copy the bytes, but just creates a wrapper
- val bufferView = b.buffer.duplicate()
- Future {
- // This is a blocking action and should run in futureExecutionContext which is a cached
- // thread pool
- replicate(blockId, bufferView, putLevel)
- }(futureExecutionContext)
- case _ => null
- }
-
- var blockWasSuccessfullyStored = false
- var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
-
- putBlockInfo.synchronized {
- logTrace("Put for block %s took %s to get into synchronized block"
- .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
- try {
- if (putLevel.useMemory) {
- // Put it in memory first, even if it also has useDisk set to true;
- // We will drop it to disk later if the memory store can't hold it.
- data match {
- case IteratorValues(iterator) =>
- memoryStore.putIterator(blockId, iterator(), putLevel) match {
- case Right(s) =>
- size = s
- case Left(iter) =>
- iteratorFromFailedMemoryStorePut = Some(iter)
- }
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- size = bytes.limit()
- memoryStore.putBytes(blockId, bytes, putLevel)
- }
- } else if (putLevel.useDisk) {
- data match {
- case IteratorValues(iterator) =>
- diskStore.putIterator(blockId, iterator(), putLevel) match {
- case Right(s) =>
- size = s
- // putIterator() will never return Left (see its return type).
- }
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- size = bytes.limit()
- diskStore.putBytes(blockId, bytes, putLevel)
- }
+ var blockWasSuccessfullyStored: Boolean = false
+ val result: Option[T] = try {
+ val res = putBody(putBlockInfo)
+ blockWasSuccessfullyStored = res.isEmpty
+ res
+ } finally {
+ if (blockWasSuccessfullyStored) {
+ if (keepReadLock) {
+ blockInfoManager.downgradeLock(blockId)
} else {
- assert(putLevel == StorageLevel.NONE)
- throw new BlockException(
- blockId, s"Attempted to put block $blockId without specifying storage level!")
+ blockInfoManager.unlock(blockId)
}
-
- val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
- blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
- if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory, externalBlockStore, or disk store,
- // let other threads read it, and tell the master about it.
- putBlockInfo.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
- }
- }
- } finally {
- if (blockWasSuccessfullyStored) {
- if (keepReadLock) {
- blockInfoManager.downgradeLock(blockId)
- } else {
- blockInfoManager.unlock(blockId)
- }
- } else {
- blockInfoManager.removeBlock(blockId)
- logWarning(s"Putting block $blockId failed")
- }
- }
- }
- logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
- if (replicationFuture != null) {
- // Wait for asynchronous replication to finish
- Await.ready(replicationFuture, Duration.Inf)
- } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
- val remoteStartTime = System.currentTimeMillis
- val bytesToReplicate: ByteBuffer = {
- doGetLocal(blockId, putBlockInfo, asBlockResult = false)
- .map(_.asInstanceOf[ByteBuffer])
- .getOrElse {
- throw new SparkException(s"Block $blockId was not found even though it was just stored")
- }
- }
- try {
- replicate(blockId, bytesToReplicate, putLevel)
- } finally {
- BlockManager.dispose(bytesToReplicate)
+ } else {
+ blockInfoManager.removeBlock(blockId)
+ logWarning(s"Putting block $blockId failed")
}
- logDebug("Put block %s remotely took %s"
- .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
-
- if (putLevel.replication > 1) {
+ if (level.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
logDebug("Putting block %s without replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
+ result
+ }
- if (blockWasSuccessfullyStored) {
- DoPutSucceeded
- } else if (iteratorFromFailedMemoryStorePut.isDefined) {
- DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get)
- } else {
- DoPutBytesFailed
+ /**
+ * Put the given block according to the given level in one of the block stores, replicating
+ * the values if necessary.
+ *
+ * If the block already exists, this method will not overwrite it.
+ *
+ * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
+ * block already exists). If false, this method will hold no locks when it
+ * returns.
+ * @return None if the block was already present or if the put succeeded, or Some(iterator)
+ * if the put failed.
+ */
+ private def doPutIterator(
+ blockId: BlockId,
+ iterator: () => Iterator[Any],
+ level: StorageLevel,
+ tellMaster: Boolean = true,
+ keepReadLock: Boolean = false): Option[Iterator[Any]] = {
+ doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+ val startTimeMs = System.currentTimeMillis
+ var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
+ // Size of the block in bytes
+ var size = 0L
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ memoryStore.putIterator(blockId, iterator(), level) match {
+ case Right(s) =>
+ size = s
+ case Left(iter) =>
+ // Not enough space to unroll this block; drop to disk if applicable
+ if (level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ diskStore.put(blockId) { fileOutputStream =>
+ dataSerializeStream(blockId, fileOutputStream, iter)
+ }
+ size = diskStore.getSize(blockId)
+ } else {
+ iteratorFromFailedMemoryStorePut = Some(iter)
+ }
+ }
+ } else if (level.useDisk) {
+ diskStore.put(blockId) { fileOutputStream =>
+ dataSerializeStream(blockId, fileOutputStream, iterator())
+ }
+ size = diskStore.getSize(blockId)
+ }
+
+ val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+ if (blockWasSuccessfullyStored) {
+ // Now that the block is in either the memory, externalBlockStore, or disk store,
+ // tell the master about it.
+ putBlockInfo.size = size
+ if (tellMaster) {
+ reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+ }
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+ }
+ logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+ if (level.replication > 1) {
+ val remoteStartTime = System.currentTimeMillis
+ val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo)
+ try {
+ replicate(blockId, bytesToReplicate, level)
+ } finally {
+ BlockManager.dispose(bytesToReplicate)
+ }
+ logDebug("Put block %s remotely took %s"
+ .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+ }
+ }
+ assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
+ iteratorFromFailedMemoryStorePut
}
}
@@ -1077,9 +1096,11 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
- diskStore.putIterator(blockId, elements.toIterator, level)
+ diskStore.put(blockId) { fileOutputStream =>
+ dataSerializeStream(blockId, fileOutputStream, elements.toIterator)
+ }
case Right(bytes) =>
- diskStore.putBytes(blockId, bytes, level)
+ diskStore.putBytes(blockId, bytes)
}
blockIsUpdated = true
}
@@ -1229,7 +1250,6 @@ private[spark] class BlockManager(
rpcEnv.stop(slaveEndpoint)
blockInfoManager.clear()
memoryStore.clear()
- diskStore.clear()
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
deleted file mode 100644
index b069918b16..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.Logging
-
-/**
- * Abstract class to store blocks.
- */
-private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
-
- def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit
-
- /**
- * Attempt to store an iterator of values.
- *
- * @return an iterator of values (in case the put failed), or the estimated size of the stored
- * values if the put succeeded.
- */
- def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Either[Iterator[Any], Long]
-
- /**
- * Return the size of a block in bytes.
- */
- def getSize(blockId: BlockId): Long
-
- def getBytes(blockId: BlockId): Option[ByteBuffer]
-
- def getValues(blockId: BlockId): Option[Iterator[Any]]
-
- /**
- * Remove a block, if it exists.
- *
- * @param blockId the block to remove.
- * @return True if the block was found and removed, False otherwise.
- * @throws IllegalStateException if the block is pinned by a task.
- */
- def remove(blockId: BlockId): Boolean
-
- def contains(blockId: BlockId): Boolean
-
- def clear() { }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 4daf22f714..e51d96e57b 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -26,18 +26,14 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
- * locations. By default, one block is mapped to one file with a name given by its BlockId.
- * However, it is also possible to have a block map to only a segment of a file, by calling
- * mapBlockToFileSegment().
+ * locations. One block is mapped to one file with a name given by its BlockId.
*
* Block files are hashed among the directories listed in spark.local.dir (or in
* SPARK_LOCAL_DIRS, if it's set).
*/
-private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf)
- extends Logging {
+private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
- private[spark]
- val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
+ private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
@@ -163,10 +159,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
private def doStop(): Unit = {
- // Only perform cleanup if an external service is not serving our shuffle files.
- // Also blockManagerId could be null if block manager is not initialized properly.
- if (!blockManager.externalShuffleServiceEnabled ||
- (blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
+ if (deleteFilesOnStop) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index e35aa1b068..caecd97a0b 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,112 +17,100 @@
package org.apache.spark.storage
-import java.io.{File, FileOutputStream, IOException, RandomAccessFile}
+import java.io.{FileOutputStream, IOException, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
-import org.apache.spark.Logging
+import com.google.common.io.Closeables
+
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Utils
/**
* Stores BlockManager blocks on disk.
*/
-private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
- extends BlockStore(blockManager) with Logging {
+private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
- val minMemoryMapBytes = blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+ private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
- override def getSize(blockId: BlockId): Long = {
+ def getSize(blockId: BlockId): Long = {
diskManager.getFile(blockId.name).length
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
- // So that we do not modify the input offsets !
- // duplicate does not copy buffer, so inexpensive
- val bytes = _bytes.duplicate()
+ /**
+ * Invokes the provided callback function to write the specific block.
+ *
+ * @throws IllegalStateException if the block already exists in the disk store.
+ */
+ def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+ if (contains(blockId)) {
+ throw new IllegalStateException(s"Block $blockId is already present in the disk store")
+ }
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
- val channel = new FileOutputStream(file).getChannel
- Utils.tryWithSafeFinally {
- while (bytes.remaining > 0) {
- channel.write(bytes)
+ val fileOutputStream = new FileOutputStream(file)
+ var threwException: Boolean = true
+ try {
+ writeFunc(fileOutputStream)
+ threwException = false
+ } finally {
+ try {
+ Closeables.close(fileOutputStream, threwException)
+ } finally {
+ if (threwException) {
+ remove(blockId)
+ }
}
- } {
- channel.close()
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
- file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
+ file.getName,
+ Utils.bytesToString(file.length()),
+ finishTime - startTime))
}
- override def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Right[Iterator[Any], Long] = {
- logDebug(s"Attempting to write values for block $blockId")
- val startTime = System.currentTimeMillis
- val file = diskManager.getFile(blockId)
- val outputStream = new FileOutputStream(file)
- try {
+ def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = {
+ // So that we do not modify the input offsets !
+ // duplicate does not copy buffer, so inexpensive
+ val bytes = _bytes.duplicate()
+ put(blockId) { fileOutputStream =>
+ val channel = fileOutputStream.getChannel
Utils.tryWithSafeFinally {
- blockManager.dataSerializeStream(blockId, outputStream, values)
+ while (bytes.remaining > 0) {
+ channel.write(bytes)
+ }
} {
- // Close outputStream here because it should be closed before file is deleted.
- outputStream.close()
+ channel.close()
}
- } catch {
- case e: Throwable =>
- if (file.exists()) {
- if (!file.delete()) {
- logWarning(s"Error deleting ${file}")
- }
- }
- throw e
}
-
- val length = file.length
-
- val timeTaken = System.currentTimeMillis - startTime
- logDebug("Block %s stored as %s file on disk in %d ms".format(
- file.getName, Utils.bytesToString(length), timeTaken))
-
- Right(length)
}
- private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
+ def getBytes(blockId: BlockId): ByteBuffer = {
+ val file = diskManager.getFile(blockId.name)
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
- if (length < minMemoryMapBytes) {
- val buf = ByteBuffer.allocate(length.toInt)
- channel.position(offset)
+ if (file.length < minMemoryMapBytes) {
+ val buf = ByteBuffer.allocate(file.length.toInt)
+ channel.position(0)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
- s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+ s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
- Some(buf)
+ buf
} else {
- Some(channel.map(MapMode.READ_ONLY, offset, length))
+ channel.map(MapMode.READ_ONLY, 0, file.length)
}
} {
channel.close()
}
}
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- val file = diskManager.getFile(blockId.name)
- getBytes(file, 0, file.length)
- }
-
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
- }
-
- override def remove(blockId: BlockId): Boolean = {
+ def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
if (file.exists()) {
val ret = file.delete()
@@ -135,7 +123,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
}
}
- override def contains(blockId: BlockId): Boolean = {
+ def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
file.exists()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index bb72fe4bca..a80b2357ff 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.storage
+package org.apache.spark.storage.memory
import java.nio.ByteBuffer
import java.util.LinkedHashMap
@@ -23,8 +23,9 @@ import java.util.LinkedHashMap
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.TaskContext
+import org.apache.spark.{Logging, SparkConf, TaskContext}
import org.apache.spark.memory.MemoryManager
+import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel}
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
@@ -34,13 +35,15 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
* Stores blocks in memory, either as Arrays of deserialized Java objects or as
* serialized ByteBuffers.
*/
-private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager)
- extends BlockStore(blockManager) {
+private[spark] class MemoryStore(
+ conf: SparkConf,
+ blockManager: BlockManager,
+ memoryManager: MemoryManager)
+ extends Logging {
// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
- private val conf = blockManager.conf
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
@@ -81,32 +84,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
memoryUsed - currentUnrollMemory
}
- override def getSize(blockId: BlockId): Long = {
+ def getSize(blockId: BlockId): Long = {
entries.synchronized {
entries.get(blockId).size
}
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
- require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- // Work on a duplicate - since the original input might be used elsewhere.
- val bytes = _bytes.duplicate()
- bytes.rewind()
- if (level.deserialized) {
- val values = blockManager.dataDeserialize(blockId, bytes)
- putIterator(blockId, values, level)
- } else {
- tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- }
- }
-
/**
* Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
* put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
*
* The caller should guarantee that `size` is correct.
+ *
+ * @return true if the put() succeeded, false otherwise.
*/
- def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
+ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
@@ -114,89 +106,70 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
if (putSuccess) {
assert(bytes.limit == size)
}
- }
-
- override def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Either[Iterator[Any], Long] = {
- require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- putIterator(blockId, values, level, allowPersistToDisk = true)
+ putSuccess
}
/**
* Attempt to put the given block in memory store.
*
- * There may not be enough space to fully unroll the iterator in memory, in which case we
- * optionally drop the values to disk if
- * (1) the block's storage level specifies useDisk, and
- * (2) `allowPersistToDisk` is true.
- *
- * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
- * back from disk and attempts to cache it in memory. In this case, we should not persist the
- * block back on disk again, as it is already in disk store.
+ * @return the estimated size of the stored data if the put() succeeded, or an iterator
+ * in case the put() failed (the returned iterator lets callers fall back to the disk
+ * store if desired).
*/
private[storage] def putIterator(
blockId: BlockId,
values: Iterator[Any],
- level: StorageLevel,
- allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
+ level: StorageLevel): Either[Iterator[Any], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
val unrolledValues = unrollSafely(blockId, values)
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
- val size = {
- if (level.deserialized) {
- val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
- tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
- sizeEstimate
+ if (level.deserialized) {
+ val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
+ if (tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)) {
+ Right(sizeEstimate)
} else {
- val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
- tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- bytes.limit()
+ Left(arrayValues.toIterator)
}
- }
- Right(size)
- case Right(iteratorValues) =>
- // Not enough space to unroll this block; drop to disk if applicable
- if (level.useDisk && allowPersistToDisk) {
- logWarning(s"Persisting block $blockId to disk instead.")
- blockManager.diskStore.putIterator(blockId, iteratorValues, level)
} else {
- Left(iteratorValues)
+ val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
+ if (tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)) {
+ Right(bytes.limit())
+ } else {
+ Left(arrayValues.toIterator)
+ }
}
+ case Right(iteratorValues) =>
+ Left(iteratorValues)
}
}
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+ def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
None
- } else if (entry.deserialized) {
- Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
} else {
+ require(!entry.deserialized, "should only call getBytes on blocks stored in serialized form")
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
}
}
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+ def getValues(blockId: BlockId): Option[Iterator[Any]] = {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
None
- } else if (entry.deserialized) {
- Some(entry.value.asInstanceOf[Array[Any]].iterator)
} else {
- val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
- Some(blockManager.dataDeserialize(blockId, buffer))
+ require(entry.deserialized, "should only call getValues on deserialized blocks")
+ Some(entry.value.asInstanceOf[Array[Any]].iterator)
}
}
- override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
+ def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
val entry = entries.synchronized {
entries.remove(blockId)
}
@@ -210,7 +183,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def clear(): Unit = memoryManager.synchronized {
+ def clear(): Unit = memoryManager.synchronized {
entries.synchronized {
entries.clear()
}
@@ -323,14 +296,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
* must also be passed by the caller.
*
- * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
- * created to avoid OOM since it may be a big ByteBuffer.
- *
- * Synchronize on `memoryManager` to ensure that all the put requests and its associated block
- * dropping is done by only on thread at a time. Otherwise while one thread is dropping
- * blocks to free memory for one block, another thread may use up the freed space for
- * another block.
- *
* @return whether put was successful.
*/
private def tryToPut(
@@ -338,42 +303,33 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
value: () => Any,
size: Long,
deserialized: Boolean): Boolean = {
+ val acquiredEnoughStorageMemory = {
+ // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another
+ // task.
+ memoryManager.synchronized {
+ // Note: if we have previously unrolled this block successfully, then pending unroll
+ // memory should be non-zero. This is the amount that we already reserved during the
+ // unrolling process. In this case, we can just reuse this space to cache our block.
+ // The synchronization on `memoryManager` here guarantees that the release and acquire
+ // happen atomically. This relies on the assumption that all memory acquisitions are
+ // synchronized on the same lock.
+ releasePendingUnrollMemoryForThisTask()
+ memoryManager.acquireStorageMemory(blockId, size)
+ }
+ }
- /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
- * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
- * been released, it must be ensured that those to-be-dropped blocks are not double counted
- * for freeing up more space for another block that needs to be put. Only then the actually
- * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
-
- memoryManager.synchronized {
- // Note: if we have previously unrolled this block successfully, then pending unroll
- // memory should be non-zero. This is the amount that we already reserved during the
- // unrolling process. In this case, we can just reuse this space to cache our block.
- // The synchronization on `memoryManager` here guarantees that the release and acquire
- // happen atomically. This relies on the assumption that all memory acquisitions are
- // synchronized on the same lock.
- releasePendingUnrollMemoryForThisTask()
- val enoughMemory = memoryManager.acquireStorageMemory(blockId, size)
- if (enoughMemory) {
- // We acquired enough memory for the block, so go ahead and put it
- val entry = new MemoryEntry(value(), size, deserialized)
- entries.synchronized {
- entries.put(blockId, entry)
- }
- val valuesOrBytes = if (deserialized) "values" else "bytes"
- logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
- blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
- } else {
- // Tell the block manager that we couldn't put it in memory so that it can drop it to
- // disk if the block allows disk storage.
- lazy val data = if (deserialized) {
- Left(value().asInstanceOf[Array[Any]])
- } else {
- Right(value().asInstanceOf[ByteBuffer].duplicate())
- }
- blockManager.dropFromMemory(blockId, () => data)
+ if (acquiredEnoughStorageMemory) {
+ // We acquired enough memory for the block, so go ahead and put it
+ val entry = new MemoryEntry(value(), size, deserialized)
+ entries.synchronized {
+ entries.put(blockId, entry)
}
- enoughMemory
+ val valuesOrBytes = if (deserialized) "values" else "bytes"
+ logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
+ blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
+ true
+ } else {
+ false
}
}
@@ -455,7 +411,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def contains(blockId: BlockId): Boolean = {
+ def contains(blockId: BlockId): Boolean = {
entries.synchronized { entries.containsKey(blockId) }
}
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 88fdbbdaec..f97cfbba32 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -37,7 +37,7 @@ class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable {
rdd.map { x =>
val bm = SparkEnv.get.blockManager
// Check if broadcast block was fetched
- val isFound = bm.getLocal(BroadcastBlockId(bid)).isDefined
+ val isFound = bm.getLocalValues(BroadcastBlockId(bid)).isDefined
(x, isFound)
}.collect().toSet
}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index d9764c7c10..686e948b5d 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -31,7 +31,8 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel}
+import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel}
+import org.apache.spark.storage.memory.MemoryStore
/**
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index eee78d396e..741d4fdf78 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
import org.mockito.Mockito.when
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{MemoryStore, TestBlockId}
+import org.apache.spark.storage.TestBlockId
+import org.apache.spark.storage.memory.MemoryStore
class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 9686c6621b..9001a26652 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{MemoryStore, TestBlockId}
+import org.apache.spark.storage.TestBlockId
+import org.apache.spark.storage.memory.MemoryStore
class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
private val dummyBlock = TestBlockId("--")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index ae1faf5d98..b78a3648cd 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -366,7 +366,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
testStore => blockLocations.contains(testStore.blockManagerId.executorId)
}.foreach { testStore =>
val testStoreName = testStore.blockManagerId.executorId
- assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+ assert(
+ testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
testStore.releaseLock(blockId)
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0485b0501c..42595c8cf2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,8 +17,7 @@
package org.apache.spark.storage
-import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.Arrays
+import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
@@ -614,11 +613,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putSingle("a3", a3, storageLevel)
assert(accessMethod("a2").isDefined, "a2 was not in store")
assert(accessMethod("a3").isDefined, "a3 was not in store")
- assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
assert(accessMethod("a1").isDefined, "a1 was not in store")
val dataShouldHaveBeenCachedBackIntoMemory = {
- if (storageLevel.deserialized) !getAsBytes
- else getAsBytes
+ if (storageLevel.deserialized) {
+ !getAsBytes
+ } else {
+ // If the block's storage level is serialized, then always cache the bytes in memory, even
+ // if the caller requested values.
+ true
+ }
}
if (dataShouldHaveBeenCachedBackIntoMemory) {
assert(store.memoryStore.contains("a1"), "a1 was not in memory store")
@@ -735,7 +738,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
- assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
+ assert(!store.memoryStore.contains("a2"), "a2 was in memory store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
}
@@ -829,50 +832,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}
- test("reads of memory-mapped and non memory-mapped files are equivalent") {
- val confKey = "spark.storage.memoryMapThreshold"
-
- // Create a non-trivial (not all zeros) byte array
- var counter = 0.toByte
- def incr: Byte = {counter = (counter + 1).toByte; counter;}
- val bytes = Array.fill[Byte](1000)(incr)
- val byteBuffer = ByteBuffer.wrap(bytes)
-
- val blockId = BlockId("rdd_1_2")
-
- // This sequence of mocks makes these tests fairly brittle. It would
- // be nice to refactor classes involved in disk storage in a way that
- // allows for easier testing.
- val blockManager = mock(classOf[BlockManager])
- when(blockManager.conf).thenReturn(conf.clone.set(confKey, "0"))
- val diskBlockManager = new DiskBlockManager(blockManager, conf)
-
- val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
- diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
- val mapped = diskStoreMapped.getBytes(blockId).get
-
- when(blockManager.conf).thenReturn(conf.clone.set(confKey, "1m"))
- val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
- diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
- val notMapped = diskStoreNotMapped.getBytes(blockId).get
-
- // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
- assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
- "Expected HeapByteBuffer for un-mapped read")
- assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
-
- def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
- val array = new Array[Byte](in.remaining())
- in.get(array)
- array
- }
-
- val mappedAsArray = arrayFromByteBuffer(mapped)
- val notMappedAsArray = arrayFromByteBuffer(notMapped)
- assert(Arrays.equals(mappedAsArray, bytes))
- assert(Arrays.equals(notMappedAsArray, bytes))
- }
-
test("updated block statuses") {
store = makeBlockManager(12000)
store.registerTask(0)
@@ -1232,19 +1191,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- // Unroll huge block with not enough space. This should fail and drop the new block to disk
- // directly in addition to kicking out b2 in the process. Memory store should contain only
- // b3, while disk store should contain b1, b2 and b4.
+ // Unroll huge block with not enough space. This should fail and return an iterator so that
+ // the block may be stored to disk. During the unrolling process, block "b2" should be kicked
+ // out, so the memory store should contain only b3, while the disk store should contain
+ // b1, b2 and b4.
val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
- assert(result4.isRight)
+ assert(result4.isLeft)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
assert(!memoryStore.contains("b4"))
- assert(diskStore.contains("b1"))
- assert(diskStore.contains("b2"))
- assert(!diskStore.contains("b3"))
- assert(diskStore.contains("b4"))
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
}
@@ -1366,7 +1322,7 @@ private object BlockManagerSuite {
getLocalAndReleaseLock(blockId).isDefined
}
- val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocal)
+ val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues)
val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 69e17461df..bbfd6df3b6 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -21,7 +21,6 @@ import java.io.{File, FileWriter}
import scala.language.reflectiveCalls
-import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -33,8 +32,6 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
private var rootDir1: File = _
private var rootDirs: String = _
- val blockManager = mock(classOf[BlockManager])
- when(blockManager.conf).thenReturn(testConf)
var diskBlockManager: DiskBlockManager = _
override def beforeAll() {
@@ -57,7 +54,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
super.beforeEach()
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
- diskBlockManager = new DiskBlockManager(blockManager, conf)
+ diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
}
override def afterEach() {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
new file mode 100644
index 0000000000..97e74fe706
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.Arrays
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class DiskStoreSuite extends SparkFunSuite {
+
+ test("reads of memory-mapped and non memory-mapped files are equivalent") {
+ val confKey = "spark.storage.memoryMapThreshold"
+
+ // Create a non-trivial (not all zeros) byte array
+ val bytes = Array.tabulate[Byte](1000)(_.toByte)
+ val byteBuffer = ByteBuffer.wrap(bytes)
+
+ val blockId = BlockId("rdd_1_2")
+ val diskBlockManager = new DiskBlockManager(new SparkConf(), deleteFilesOnStop = true)
+
+ val diskStoreMapped = new DiskStore(new SparkConf().set(confKey, "0"), diskBlockManager)
+ diskStoreMapped.putBytes(blockId, byteBuffer)
+ val mapped = diskStoreMapped.getBytes(blockId)
+ assert(diskStoreMapped.remove(blockId))
+
+ val diskStoreNotMapped = new DiskStore(new SparkConf().set(confKey, "1m"), diskBlockManager)
+ diskStoreNotMapped.putBytes(blockId, byteBuffer)
+ val notMapped = diskStoreNotMapped.getBytes(blockId)
+
+ // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
+ assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
+ "Expected HeapByteBuffer for un-mapped read")
+ assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
+
+ def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
+ val array = new Array[Byte](in.remaining())
+ in.get(array)
+ array
+ }
+
+ val mappedAsArray = arrayFromByteBuffer(mapped)
+ val notMappedAsArray = arrayFromByteBuffer(notMapped)
+ assert(Arrays.equals(mappedAsArray, bytes))
+ assert(Arrays.equals(notMappedAsArray, bytes))
+ }
+}