aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfo.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala479
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala88
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonStore.scala39
-rw-r--r--project/MimaExcludes.scala7
9 files changed, 362 insertions, 370 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 811610c657..315ed91f81 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -32,10 +32,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[RDDBlockId]()
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
- def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
+ def getOrCompute[T](
+ rdd: RDD[T],
+ split: Partition,
+ context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
+
val key = RDDBlockId(rdd.id, split.index)
- logDebug("Looking for partition " + key)
+ logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(values) =>
// Partition is already materialized, so just return its values
@@ -45,7 +49,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// Mark the split as loading (unless someone else marks it first)
loading.synchronized {
if (loading.contains(key)) {
- logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
+ logInfo(s"Another thread is loading $key, waiting for it to finish...")
while (loading.contains(key)) {
try {
loading.wait()
@@ -54,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logWarning(s"Got an exception while waiting for another thread to load $key", e)
}
}
- logInfo("Finished waiting for %s".format(key))
+ logInfo(s"Finished waiting for $key")
/* See whether someone else has successfully loaded it. The main way this would fail
* is for the RDD-level cache eviction policy if someone else has loaded the same RDD
* partition but we didn't want to make space for it. However, that case is unlikely
@@ -64,7 +68,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
- logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
+ logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
loading.add(key)
}
} else {
@@ -73,7 +77,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
- logInfo("Partition %s not found, computing it".format(key))
+ logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
@@ -97,8 +101,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) =>
values.asInstanceOf[Iterator[T]]
case None =>
- logInfo("Failure to store %s".format(key))
- throw new Exception("Block manager failed to return persisted valued")
+ logInfo(s"Failure to store $key")
+ throw new SparkException("Block manager failed to return persisted value")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
index c8f397609a..22fdf73e9d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
@@ -29,9 +29,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
setInitThread()
private def setInitThread() {
- // Set current thread as init thread - waitForReady will not block this thread
- // (in case there is non trivial initialization which ends up calling waitForReady as part of
- // initialization itself)
+ /* Set current thread as init thread - waitForReady will not block this thread
+ * (in case there is non trivial initialization which ends up calling waitForReady
+ * as part of initialization itself) */
BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
}
@@ -42,7 +42,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
def waitForReady(): Boolean = {
if (pending && initThread != Thread.currentThread()) {
synchronized {
- while (pending) this.wait()
+ while (pending) {
+ this.wait()
+ }
}
}
!failed
@@ -50,8 +52,8 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
/** Mark this BlockInfo as ready (i.e. block is finished writing) */
def markReady(sizeInBytes: Long) {
- require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
- assert (pending)
+ require(sizeInBytes >= 0, s"sizeInBytes was negative: $sizeInBytes")
+ assert(pending)
size = sizeInBytes
BlockInfo.blockInfoInitThreads.remove(this)
synchronized {
@@ -61,7 +63,7 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
/** Mark this BlockInfo as ready but failed */
def markFailure() {
- assert (pending)
+ assert(pending)
size = BlockInfo.BLOCK_FAILED
BlockInfo.blockInfoInitThreads.remove(this)
synchronized {
@@ -71,9 +73,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
}
private object BlockInfo {
- // initThread is logically a BlockInfo field, but we store it here because
- // it's only needed while this block is in the 'pending' state and we want
- // to minimize BlockInfo's memory footprint.
+ /* initThread is logically a BlockInfo field, but we store it here because
+ * it's only needed while this block is in the 'pending' state and we want
+ * to minimize BlockInfo's memory footprint. */
private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
private val BLOCK_PENDING: Long = -1L
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 9cd79d262e..f52bc70751 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -28,46 +28,48 @@ import scala.util.Random
import akka.actor.{ActorSystem, Cancellable, Props}
import sun.nio.ch.DirectBuffer
-import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException}
+import org.apache.spark._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
import org.apache.spark.util._
-private[spark] sealed trait Values
-
-private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends Values
-private[spark] case class IteratorValues(iterator: Iterator[Any]) extends Values
-private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values
+private[spark] sealed trait BlockValues
+private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
+private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
+private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
val master: BlockManagerMaster,
- val defaultSerializer: Serializer,
+ defaultSerializer: Serializer,
maxMemory: Long,
- val _conf: SparkConf,
+ val conf: SparkConf,
securityManager: SecurityManager,
mapOutputTracker: MapOutputTracker)
extends Logging {
- def conf = _conf
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
- conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ val connectionManager = new ConnectionManager(0, conf, securityManager)
+
+ implicit val futureExecContext = connectionManager.futureExecContext
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
+ // Actual storage of where blocks are kept
+ private var tachyonInitialized = false
private[storage] val memoryStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
- var tachyonInitialized = false
private[storage] lazy val tachyonStore: TachyonStore = {
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
- val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
+ val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
- val tachyonBlockManager = new TachyonBlockManager(
- shuffleBlockManager, tachyonStorePath, tachyonMaster)
+ val tachyonBlockManager =
+ new TachyonBlockManager(shuffleBlockManager, tachyonStorePath, tachyonMaster)
tachyonInitialized = true
new TachyonStore(this, tachyonBlockManager)
}
@@ -79,43 +81,39 @@ private[spark] class BlockManager(
if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
}
- val connectionManager = new ConnectionManager(0, conf, securityManager)
- implicit val futureExecContext = connectionManager.futureExecContext
-
val blockManagerId = BlockManagerId(
executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
- val maxBytesInFlight =
- conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024
+ val maxBytesInFlight = conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024
// Whether to compress broadcast variables that are stored
- val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
+ private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
// Whether to compress shuffle output that are stored
- val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
+ private val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
// Whether to compress RDD partitions that are stored serialized
- val compressRdds = conf.getBoolean("spark.rdd.compress", false)
+ private val compressRdds = conf.getBoolean("spark.rdd.compress", false)
// Whether to compress shuffle output temporarily spilled to disk
- val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
+ private val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
- val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
-
- val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this, mapOutputTracker)),
+ private val slaveActor = actorSystem.actorOf(
+ Props(new BlockManagerSlaveActor(this, mapOutputTracker)),
name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
- // Pending re-registration action being executed asynchronously or null if none
- // is pending. Accesses should synchronize on asyncReregisterLock.
- var asyncReregisterTask: Future[Unit] = null
- val asyncReregisterLock = new Object
+ // Pending re-registration action being executed asynchronously or null if none is pending.
+ // Accesses should synchronize on asyncReregisterLock.
+ private var asyncReregisterTask: Future[Unit] = null
+ private val asyncReregisterLock = new Object
- private def heartBeat() {
+ private def heartBeat(): Unit = {
if (!master.sendHeartBeat(blockManagerId)) {
reregister()
}
}
- var heartBeatTask: Cancellable = null
+ private val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
+ private var heartBeatTask: Cancellable = null
private val metadataCleaner = new MetadataCleaner(
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
@@ -124,11 +122,11 @@ private[spark] class BlockManager(
initialize()
- // The compression codec to use. Note that the "lazy" val is necessary because we want to delay
- // the initialization of the compression codec until it is first used. The reason is that a Spark
- // program could be using a user-defined codec in a third party jar, which is loaded in
- // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
- // loaded yet.
+ /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
+ * the initialization of the compression codec until it is first used. The reason is that a Spark
+ * program could be using a user-defined codec in a third party jar, which is loaded in
+ * Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
+ * loaded yet. */
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
/**
@@ -150,7 +148,7 @@ private[spark] class BlockManager(
* Initialize the BlockManager. Register to the BlockManagerMaster, and start the
* BlockManagerWorker actor.
*/
- private def initialize() {
+ private def initialize(): Unit = {
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
@@ -170,12 +168,12 @@ private[spark] class BlockManager(
* heart beat attempt or new block registration and another try to re-register all blocks
* will be made then.
*/
- private def reportAllBlocks() {
- logInfo("Reporting " + blockInfo.size + " blocks to the master.")
+ private def reportAllBlocks(): Unit = {
+ logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
for ((blockId, info) <- blockInfo) {
val status = getCurrentBlockStatus(blockId, info)
if (!tryToReportBlockStatus(blockId, info, status)) {
- logError("Failed to report " + blockId + " to master; giving up.")
+ logError(s"Failed to report $blockId to master; giving up.")
return
}
}
@@ -187,7 +185,7 @@ private[spark] class BlockManager(
*
* Note that this method must be called without any BlockInfo locks held.
*/
- def reregister() {
+ private def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo("BlockManager re-registering with master")
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
@@ -197,7 +195,7 @@ private[spark] class BlockManager(
/**
* Re-register with the master sometime soon.
*/
- def asyncReregister() {
+ private def asyncReregister(): Unit = {
asyncReregisterLock.synchronized {
if (asyncReregisterTask == null) {
asyncReregisterTask = Future[Unit] {
@@ -213,7 +211,7 @@ private[spark] class BlockManager(
/**
* For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing.
*/
- def waitForAsyncReregister() {
+ def waitForAsyncReregister(): Unit = {
val task = asyncReregisterTask
if (task != null) {
Await.ready(task, Duration.Inf)
@@ -251,18 +249,18 @@ private[spark] class BlockManager(
* it is still valid). This ensures that update in master will compensate for the increase in
* memory on slave.
*/
- def reportBlockStatus(
+ private def reportBlockStatus(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
- droppedMemorySize: Long = 0L) {
+ droppedMemorySize: Long = 0L): Unit = {
val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
if (needReregister) {
- logInfo("Got told to re-register updating block " + blockId)
+ logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
asyncReregister()
}
- logDebug("Told master about block " + blockId)
+ logDebug(s"Told master about block $blockId")
}
/**
@@ -293,10 +291,10 @@ private[spark] class BlockManager(
* and the updated in-memory and on-disk sizes.
*/
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
- val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized {
+ info.synchronized {
info.level match {
case null =>
- (StorageLevel.NONE, 0L, 0L, 0L)
+ BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
@@ -307,19 +305,18 @@ private[spark] class BlockManager(
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
- (storageLevel, memSize, diskSize, tachyonSize)
+ BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
}
}
- BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize)
}
/**
* Get locations of an array of blocks.
*/
- def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = {
+ private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis
val locations = master.getLocations(blockIds).toArray
- logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got multiple block location in %s".format(Utils.getUsedTimeMs(startTimeMs)))
locations
}
@@ -329,15 +326,16 @@ private[spark] class BlockManager(
* never deletes (recent) items.
*/
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
- diskStore.getValues(blockId, serializer).orElse(
- sys.error("Block " + blockId + " not found on disk, though it should be"))
+ diskStore.getValues(blockId, serializer).orElse {
+ throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")
+ }
}
/**
* Get block from local block manager.
*/
def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
- logDebug("Getting local block " + blockId)
+ logDebug(s"Getting local block $blockId")
doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
}
@@ -345,7 +343,7 @@ private[spark] class BlockManager(
* Get block from the local block manager as serialized bytes.
*/
def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
- logDebug("Getting local block " + blockId + " as bytes")
+ logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
@@ -353,7 +351,8 @@ private[spark] class BlockManager(
case Some(bytes) =>
Some(bytes)
case None =>
- throw new Exception("Block " + blockId + " not found on disk, though it should be")
+ throw new BlockException(
+ blockId, s"Block $blockId not found on disk, though it should be")
}
} else {
doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
@@ -368,16 +367,16 @@ private[spark] class BlockManager(
// If another thread is writing the block, wait for it to become ready.
if (!info.waitForReady()) {
// If we get here, the block write failed.
- logWarning("Block " + blockId + " was marked as failure.")
+ logWarning(s"Block $blockId was marked as failure.")
return None
}
val level = info.level
- logDebug("Level for block " + blockId + " is " + level)
+ logDebug(s"Level for block $blockId is $level")
// Look for the block in memory
if (level.useMemory) {
- logDebug("Getting block " + blockId + " from memory")
+ logDebug(s"Getting block $blockId from memory")
val result = if (asValues) {
memoryStore.getValues(blockId)
} else {
@@ -387,51 +386,51 @@ private[spark] class BlockManager(
case Some(values) =>
return Some(values)
case None =>
- logDebug("Block " + blockId + " not found in memory")
+ logDebug(s"Block $blockId not found in memory")
}
}
// Look for the block in Tachyon
if (level.useOffHeap) {
- logDebug("Getting block " + blockId + " from tachyon")
+ logDebug(s"Getting block $blockId from tachyon")
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
- case Some(bytes) => {
+ case Some(bytes) =>
if (!asValues) {
return Some(bytes)
} else {
return Some(dataDeserialize(blockId, bytes))
}
- }
case None =>
- logDebug("Block " + blockId + " not found in tachyon")
+ logDebug(s"Block $blockId not found in tachyon")
}
}
}
- // Look for block on disk, potentially storing it back into memory if required:
+ // Look for block on disk, potentially storing it back in memory if required
if (level.useDisk) {
- logDebug("Getting block " + blockId + " from disk")
+ logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
- case Some(bytes) => bytes
+ case Some(b) => b
case None =>
- throw new Exception("Block " + blockId + " not found on disk, though it should be")
+ throw new BlockException(
+ blockId, s"Block $blockId not found on disk, though it should be")
}
- assert (0 == bytes.position())
+ assert(0 == bytes.position())
if (!level.useMemory) {
- // If the block shouldn't be stored in memory, we can just return it:
+ // If the block shouldn't be stored in memory, we can just return it
if (asValues) {
return Some(dataDeserialize(blockId, bytes))
} else {
return Some(bytes)
}
} else {
- // Otherwise, we also have to store something in the memory store:
+ // Otherwise, we also have to store something in the memory store
if (!level.deserialized || !asValues) {
- // We'll store the bytes in memory if the block's storage level includes
- // "memory serialized", or if it should be cached as objects in memory
- // but we only requested its serialized bytes:
+ /* We'll store the bytes in memory if the block's storage level includes
+ * "memory serialized", or if it should be cached as objects in memory
+ * but we only requested its serialized bytes. */
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
@@ -442,16 +441,17 @@ private[spark] class BlockManager(
} else {
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
- // Cache the values before returning them:
+ // Cache the values before returning them
// TODO: Consider creating a putValues that also takes in a iterator?
val valuesBuffer = new ArrayBuffer[Any]
valuesBuffer ++= values
- memoryStore.putValues(blockId, valuesBuffer, level, true).data match {
- case Left(values2) =>
- return Some(values2)
- case _ =>
- throw new Exception("Memory store did not return back an iterator")
- }
+ memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
+ match {
+ case Left(values2) =>
+ return Some(values2)
+ case _ =>
+ throw new SparkException("Memory store did not return an iterator")
+ }
} else {
return Some(values)
}
@@ -460,7 +460,7 @@ private[spark] class BlockManager(
}
}
} else {
- logDebug("Block " + blockId + " not registered locally")
+ logDebug(s"Block $blockId not registered locally")
}
None
}
@@ -469,7 +469,7 @@ private[spark] class BlockManager(
* Get block from remote block managers.
*/
def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
- logDebug("Getting remote block " + blockId)
+ logDebug(s"Getting remote block $blockId")
doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
}
@@ -477,7 +477,7 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
- logDebug("Getting remote block " + blockId + " as bytes")
+ logDebug(s"Getting remote block $blockId as bytes")
doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
}
@@ -485,7 +485,7 @@ private[spark] class BlockManager(
require(blockId != null, "BlockId is null")
val locations = Random.shuffle(master.getLocations(blockId))
for (loc <- locations) {
- logDebug("Getting remote block " + blockId + " from " + loc)
+ logDebug(s"Getting remote block $blockId from $loc")
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
@@ -495,9 +495,9 @@ private[spark] class BlockManager(
return Some(data)
}
}
- logDebug("The value of block " + blockId + " is null")
+ logDebug(s"The value of block $blockId is null")
}
- logDebug("Block " + blockId + " not found")
+ logDebug(s"Block $blockId not found")
None
}
@@ -507,12 +507,12 @@ private[spark] class BlockManager(
def get(blockId: BlockId): Option[Iterator[Any]] = {
val local = getLocal(blockId)
if (local.isDefined) {
- logInfo("Found block %s locally".format(blockId))
+ logInfo(s"Found block $blockId locally")
return local
}
val remote = getRemote(blockId)
if (remote.isDefined) {
- logInfo("Found block %s remotely".format(blockId))
+ logInfo(s"Found block $blockId remotely")
return remote
}
None
@@ -533,7 +533,6 @@ private[spark] class BlockManager(
} else {
new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
}
-
iter.initialize()
iter
}
@@ -543,6 +542,7 @@ private[spark] class BlockManager(
values: Iterator[Any],
level: StorageLevel,
tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
+ require(values != null, "Values is null")
doPut(blockId, IteratorValues(values), level, tellMaster)
}
@@ -562,8 +562,8 @@ private[spark] class BlockManager(
}
/**
- * Put a new block of values to the block manager. Return a list of blocks updated as a
- * result of this put.
+ * Put a new block of values to the block manager.
+ * Return a list of blocks updated as a result of this put.
*/
def put(
blockId: BlockId,
@@ -575,8 +575,8 @@ private[spark] class BlockManager(
}
/**
- * Put a new block of serialized bytes to the block manager. Return a list of blocks updated
- * as a result of this put.
+ * Put a new block of serialized bytes to the block manager.
+ * Return a list of blocks updated as a result of this put.
*/
def putBytes(
blockId: BlockId,
@@ -589,7 +589,7 @@ private[spark] class BlockManager(
private def doPut(
blockId: BlockId,
- data: Values,
+ data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
@@ -599,20 +599,18 @@ private[spark] class BlockManager(
// Return value
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- // Remember the block's storage level so that we can correctly drop it to disk if it needs
- // to be dropped right after it got put into memory. Note, however, that other threads will
- // not be able to get() this block until we call markReady on its BlockInfo.
+ /* Remember the block's storage level so that we can correctly drop it to disk if it needs
+ * to be dropped right after it got put into memory. Note, however, that other threads will
+ * not be able to get() this block until we call markReady on its BlockInfo. */
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
-
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
- logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
+ logWarning(s"Block $blockId already exists on this machine; not re-adding it")
return updatedBlocks
}
-
// TODO: So the block info exists - but previous attempt to load it (?) failed.
// What do we do now ? Retry on it ?
oldBlockOpt.get
@@ -623,10 +621,10 @@ private[spark] class BlockManager(
val startTimeMs = System.currentTimeMillis
- // If we're storing values and we need to replicate the data, we'll want access to the values,
- // but because our put will read the whole iterator, there will be no values left. For the
- // case where the put serializes data, we'll remember the bytes, above; but for the case where
- // it doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
+ /* If we're storing values and we need to replicate the data, we'll want access to the values,
+ * but because our put will read the whole iterator, there will be no values left. For the
+ * case where the put serializes data, we'll remember the bytes, above; but for the case where
+ * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */
var valuesAfterPut: Iterator[Any] = null
// Ditto for the bytes after the put
@@ -637,78 +635,62 @@ private[spark] class BlockManager(
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
- val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
- // Duplicate doesn't copy the bytes, just creates a wrapper
- val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
- Future {
- replicate(blockId, bufferView, level)
- }
- } else {
- null
+ val replicationFuture = data match {
+ case b: ByteBufferValues if level.replication > 1 =>
+ // Duplicate doesn't copy the bytes, but just creates a wrapper
+ val bufferView = b.buffer.duplicate()
+ Future { replicate(blockId, bufferView, level) }
+ case _ => null
}
putBlockInfo.synchronized {
- logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
- + " to get into synchronized block")
+ logTrace("Put for block %s took %s to get into synchronized block"
+ .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
var marked = false
try {
- if (level.useMemory) {
- // Save it just to memory first, even if it also has useDisk set to true; we will
- // drop it to disk later if the memory store can't hold it.
- val res = data match {
- case IteratorValues(iterator) =>
- memoryStore.putValues(blockId, iterator, level, true)
- case ArrayBufferValues(array) =>
- memoryStore.putValues(blockId, array, level, true)
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- memoryStore.putBytes(blockId, bytes, level)
- }
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case Left(newIterator) => valuesAfterPut = newIterator
- }
- // Keep track of which blocks are dropped from memory
- res.droppedBlocks.foreach { block => updatedBlocks += block }
- } else if (level.useOffHeap) {
- // Save to Tachyon.
- val res = data match {
- case IteratorValues(iterator) =>
- tachyonStore.putValues(blockId, iterator, level, false)
- case ArrayBufferValues(array) =>
- tachyonStore.putValues(blockId, array, level, false)
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- tachyonStore.putBytes(blockId, bytes, level)
- }
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case _ =>
- }
- } else {
- // Save directly to disk.
- // Don't get back the bytes unless we replicate them.
- val askForBytes = level.replication > 1
-
- val res = data match {
- case IteratorValues(iterator) =>
- diskStore.putValues(blockId, iterator, level, askForBytes)
- case ArrayBufferValues(array) =>
- diskStore.putValues(blockId, array, level, askForBytes)
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- diskStore.putBytes(blockId, bytes, level)
- }
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case _ =>
+ // returnValues - Whether to return the values put
+ // blockStore - The type of storage to put these values into
+ val (returnValues, blockStore: BlockStore) = {
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ (true, memoryStore)
+ } else if (level.useOffHeap) {
+ // Use tachyon for off-heap storage
+ (false, tachyonStore)
+ } else if (level.useDisk) {
+ // Don't get back the bytes from put unless we replicate them
+ (level.replication > 1, diskStore)
+ } else {
+ assert(level == StorageLevel.NONE)
+ throw new BlockException(
+ blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}
+ // Actually put the values
+ val result = data match {
+ case IteratorValues(iterator) =>
+ blockStore.putValues(blockId, iterator, level, returnValues)
+ case ArrayBufferValues(array) =>
+ blockStore.putValues(blockId, array, level, returnValues)
+ case ByteBufferValues(bytes) =>
+ bytes.rewind()
+ blockStore.putBytes(blockId, bytes, level)
+ }
+ size = result.size
+ result.data match {
+ case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator
+ case Right (newBytes) => bytesAfterPut = newBytes
+ case _ =>
+ }
+
+ // Keep track of which blocks are dropped from memory
+ if (level.useMemory) {
+ result.droppedBlocks.foreach { updatedBlocks += _ }
+ }
+
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory, tachyon, or disk store,
@@ -728,18 +710,21 @@ private[spark] class BlockManager(
// could've inserted a new BlockInfo before we remove it.
blockInfo.remove(blockId)
putBlockInfo.markFailure()
- logWarning("Putting block " + blockId + " failed")
+ logWarning(s"Putting block $blockId failed")
}
}
}
- logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
// Either we're storing bytes and we asynchronously started replication, or we're storing
// values and need to serialize and replicate them now:
if (level.replication > 1) {
data match {
- case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
- case _ => {
+ case ByteBufferValues(bytes) =>
+ if (replicationFuture != null) {
+ Await.ready(replicationFuture, Duration.Inf)
+ }
+ case _ =>
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
@@ -750,20 +735,19 @@ private[spark] class BlockManager(
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
- logDebug("Put block " + blockId + " remotely took " +
- Utils.getUsedTimeMs(remoteStartTime))
- }
+ logDebug("Put block %s remotely took %s"
+ .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
BlockManager.dispose(bytesAfterPut)
if (level.replication > 1) {
- logDebug("Put for block " + blockId + " with replication took " +
- Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Putting block %s with replication took %s"
+ .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
- logDebug("Put for block " + blockId + " without replication took " +
- Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Putting block %s without replication took %s"
+ .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
updatedBlocks
@@ -773,7 +757,7 @@ private[spark] class BlockManager(
* Replicate block to another node.
*/
@volatile var cachedPeers: Seq[BlockManagerId] = null
- private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
+ private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
if (cachedPeers == null) {
@@ -782,15 +766,16 @@ private[spark] class BlockManager(
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
data.rewind()
- logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is "
- + data.limit() + " Bytes. To node: " + peer)
- if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel),
- new ConnectionManagerId(peer.host, peer.port))) {
- logError("Failed to call syncPutBlock to " + peer)
+ logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
+ s"To node: $peer")
+ val putBlock = PutBlock(blockId, data, tLevel)
+ val cmId = new ConnectionManagerId(peer.host, peer.port)
+ val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
+ if (!syncPutBlockSuccess) {
+ logError(s"Failed to call syncPutBlock to $peer")
}
- logDebug("Replicated BlockId " + blockId + " once used " +
- (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
- data.limit() + " bytes.")
+ logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
+ .format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
}
}
@@ -822,17 +807,17 @@ private[spark] class BlockManager(
blockId: BlockId,
data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
- logInfo("Dropping block " + blockId + " from memory")
+ logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
// If the block has not already been dropped
- if (info != null) {
+ if (info != null) {
info.synchronized {
// required ? As of now, this will be invoked only for blocks which are ready
// But in case this changes in future, adding for consistency sake.
if (!info.waitForReady()) {
// If we get here, the block write failed.
- logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
+ logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
}
@@ -841,10 +826,10 @@ private[spark] class BlockManager(
// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
- logInfo("Writing block " + blockId + " to disk")
+ logInfo(s"Writing block $blockId to disk")
data match {
case Left(elements) =>
- diskStore.putValues(blockId, elements, level, false)
+ diskStore.putValues(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
@@ -858,7 +843,7 @@ private[spark] class BlockManager(
if (blockIsRemoved) {
blockIsUpdated = true
} else {
- logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
+ logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
}
val status = getCurrentBlockStatus(blockId, info)
@@ -883,7 +868,7 @@ private[spark] class BlockManager(
*/
def removeRdd(rddId: Int): Int = {
// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
- logInfo("Removing RDD " + rddId)
+ logInfo(s"Removing RDD $rddId")
val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
blocksToRemove.size
@@ -893,7 +878,7 @@ private[spark] class BlockManager(
* Remove all blocks belonging to the given broadcast.
*/
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
- logInfo("Removing broadcast " + broadcastId)
+ logInfo(s"Removing broadcast $broadcastId")
val blocksToRemove = blockInfo.keys.collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
@@ -904,40 +889,42 @@ private[spark] class BlockManager(
/**
* Remove a block from both memory and disk.
*/
- def removeBlock(blockId: BlockId, tellMaster: Boolean = true) {
- logInfo("Removing block " + blockId)
+ def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
+ logInfo(s"Removing block $blockId")
val info = blockInfo.get(blockId).orNull
- if (info != null) info.synchronized {
- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
- val removedFromMemory = memoryStore.remove(blockId)
- val removedFromDisk = diskStore.remove(blockId)
- val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
- if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
- logWarning("Block " + blockId + " could not be removed as it was not found in either " +
- "the disk, memory, or tachyon store")
- }
- blockInfo.remove(blockId)
- if (tellMaster && info.tellMaster) {
- val status = getCurrentBlockStatus(blockId, info)
- reportBlockStatus(blockId, info, status)
+ if (info != null) {
+ info.synchronized {
+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
+ if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
+ logWarning(s"Block $blockId could not be removed as it was not found in either " +
+ "the disk, memory, or tachyon store")
+ }
+ blockInfo.remove(blockId)
+ if (tellMaster && info.tellMaster) {
+ val status = getCurrentBlockStatus(blockId, info)
+ reportBlockStatus(blockId, info, status)
+ }
}
} else {
// The block has already been removed; do nothing.
- logWarning("Asked to remove block " + blockId + ", which does not exist")
+ logWarning(s"Asked to remove block $blockId, which does not exist")
}
}
- private def dropOldNonBroadcastBlocks(cleanupTime: Long) {
- logInfo("Dropping non broadcast blocks older than " + cleanupTime)
+ private def dropOldNonBroadcastBlocks(cleanupTime: Long): Unit = {
+ logInfo(s"Dropping non broadcast blocks older than $cleanupTime")
dropOldBlocks(cleanupTime, !_.isBroadcast)
}
- private def dropOldBroadcastBlocks(cleanupTime: Long) {
- logInfo("Dropping broadcast blocks older than " + cleanupTime)
+ private def dropOldBroadcastBlocks(cleanupTime: Long): Unit = {
+ logInfo(s"Dropping broadcast blocks older than $cleanupTime")
dropOldBlocks(cleanupTime, _.isBroadcast)
}
- private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)) {
+ private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)): Unit = {
val iterator = blockInfo.getEntrySet.iterator
while (iterator.hasNext) {
val entry = iterator.next()
@@ -945,17 +932,11 @@ private[spark] class BlockManager(
if (time < cleanupTime && shouldDrop(id)) {
info.synchronized {
val level = info.level
- if (level.useMemory) {
- memoryStore.remove(id)
- }
- if (level.useDisk) {
- diskStore.remove(id)
- }
- if (level.useOffHeap) {
- tachyonStore.remove(id)
- }
+ if (level.useMemory) { memoryStore.remove(id) }
+ if (level.useDisk) { diskStore.remove(id) }
+ if (level.useOffHeap) { tachyonStore.remove(id) }
iterator.remove()
- logInfo("Dropped block " + id)
+ logInfo(s"Dropped block $id")
}
val status = getCurrentBlockStatus(id, info)
reportBlockStatus(id, info, status)
@@ -963,12 +944,14 @@ private[spark] class BlockManager(
}
}
- def shouldCompress(blockId: BlockId): Boolean = blockId match {
- case ShuffleBlockId(_, _, _) => compressShuffle
- case BroadcastBlockId(_, _) => compressBroadcast
- case RDDBlockId(_, _) => compressRdds
- case TempBlockId(_) => compressShuffleSpill
- case _ => false
+ private def shouldCompress(blockId: BlockId): Boolean = {
+ blockId match {
+ case _: ShuffleBlockId => compressShuffle
+ case _: BroadcastBlockId => compressBroadcast
+ case _: RDDBlockId => compressRdds
+ case _: TempBlockId => compressShuffleSpill
+ case _ => false
+ }
}
/**
@@ -990,7 +973,7 @@ private[spark] class BlockManager(
blockId: BlockId,
outputStream: OutputStream,
values: Iterator[Any],
- serializer: Serializer = defaultSerializer) {
+ serializer: Serializer = defaultSerializer): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
val ser = serializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
@@ -1016,16 +999,16 @@ private[spark] class BlockManager(
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind()
- def getIterator = {
+ def getIterator: Iterator[Any] = {
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
serializer.newInstance().deserializeStream(stream).asIterator
}
if (blockId.isShuffle) {
- // Reducer may need to read many local shuffle blocks and will wrap them into Iterators
- // at the beginning. The wrapping will cost some memory (compression instance
- // initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
- // wrapping lazily to save memory.
+ /* Reducer may need to read many local shuffle blocks and will wrap them into Iterators
+ * at the beginning. The wrapping will cost some memory (compression instance
+ * initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
+ * wrapping lazily to save memory. */
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
lazy val proxy = f
override def hasNext: Boolean = proxy.hasNext
@@ -1037,7 +1020,7 @@ private[spark] class BlockManager(
}
}
- def stop() {
+ def stop(): Unit = {
if (heartBeatTask != null) {
heartBeatTask.cancel()
}
@@ -1059,9 +1042,9 @@ private[spark] class BlockManager(
private[spark] object BlockManager extends Logging {
- val ID_GENERATOR = new IdGenerator
+ private val ID_GENERATOR = new IdGenerator
- def getMaxMemory(conf: SparkConf): Long = {
+ private def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
@@ -1078,9 +1061,9 @@ private[spark] object BlockManager extends Logging {
* waiting for the GC to find it because that could lead to huge numbers of open files. There's
* unfortunately no standard API to do this.
*/
- def dispose(buffer: ByteBuffer) {
+ def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
- logTrace("Unmapping " + buffer)
+ logTrace(s"Unmapping $buffer")
if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
}
@@ -1093,7 +1076,7 @@ private[spark] object BlockManager extends Logging {
blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = {
// blockManagerMaster != null is used in tests
- assert (env != null || blockManagerMaster != null)
+ assert(env != null || blockManagerMaster != null)
val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
env.blockManager.getLocationBlockIds(blockIds)
} else {
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 3a7243a1ba..2ec46d416f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -40,9 +40,9 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64)
- // Create one local directory for each path mentioned in spark.local.dir; then, inside this
- // directory, create multiple subdirectories that we will hash files into, in order to avoid
- // having really large inodes at the top level.
+ /* Create one local directory for each path mentioned in spark.local.dir; then, inside this
+ * directory, create multiple subdirectories that we will hash files into, in order to avoid
+ * having really large inodes at the top level. */
private val localDirs: Array[File] = createLocalDirs()
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private var shuffleSender : ShuffleSender = null
@@ -114,7 +114,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
}
private def createLocalDirs(): Array[File] = {
- logDebug("Creating local directories at root dirs '" + rootDirs + "'")
+ logDebug(s"Creating local directories at root dirs '$rootDirs'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map { rootDir =>
var foundLocalDir = false
@@ -126,21 +126,20 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
tries += 1
try {
localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
- localDir = new File(rootDir, "spark-local-" + localDirId)
+ localDir = new File(rootDir, s"spark-local-$localDirId")
if (!localDir.exists) {
foundLocalDir = localDir.mkdirs()
}
} catch {
case e: Exception =>
- logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e)
+ logWarning(s"Attempt $tries to create local dir $localDir failed", e)
}
}
if (!foundLocalDir) {
- logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
- " attempts to create local dir in " + rootDir)
+ logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
- logInfo("Created local directory at " + localDir)
+ logInfo(s"Created local directory at $localDir")
localDir
}
}
@@ -163,7 +162,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
} catch {
case e: Exception =>
- logError("Exception while deleting local spark dir: " + localDir, e)
+ logError(s"Exception while deleting local spark dir: $localDir", e)
}
}
}
@@ -175,7 +174,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
private[storage] def startShuffleBlockSender(port: Int): Int = {
shuffleSender = new ShuffleSender(port, this)
- logInfo("Created ShuffleSender binding to port : " + shuffleSender.port)
+ logInfo(s"Created ShuffleSender binding to port: ${shuffleSender.port}")
shuffleSender.port
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 0ab9fad422..ebff0cb5ba 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -39,41 +39,39 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
diskManager.getBlockLocation(blockId).length
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
- logDebug("Attempting to put block " + blockId)
+ logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
- val channel = new FileOutputStream(file).getChannel()
+ val channel = new FileOutputStream(file).getChannel
while (bytes.remaining > 0) {
channel.write(bytes)
}
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
- file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
- return PutResult(bytes.limit(), Right(bytes.duplicate()))
+ file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
}
override def putValues(
blockId: BlockId,
values: ArrayBuffer[Any],
level: StorageLevel,
- returnValues: Boolean)
- : PutResult = {
- return putValues(blockId, values.toIterator, level, returnValues)
+ returnValues: Boolean): PutResult = {
+ putValues(blockId, values.toIterator, level, returnValues)
}
override def putValues(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
- returnValues: Boolean)
- : PutResult = {
+ returnValues: Boolean): PutResult = {
- logDebug("Attempting to write values for block " + blockId)
+ logDebug(s"Attempting to write values for block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
@@ -95,7 +93,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val segment = diskManager.getBlockLocation(blockId)
- val channel = new RandomAccessFile(segment.file, "r").getChannel()
+ val channel = new RandomAccessFile(segment.file, "r").getChannel
try {
// For small files, directly read rather than memory map
@@ -131,7 +129,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
file.delete()
} else {
if (fileSegment.length < file.length()) {
- logWarning("Could not delete block associated with only a part of a file: " + blockId)
+ logWarning(s"Could not delete block associated with only a part of a file: $blockId")
}
false
}
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 488f1ea962..084a566c48 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -24,6 +24,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.{SizeEstimator, Utils}
+private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
+
/**
* Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
* serialized ByteBuffers.
@@ -31,15 +33,13 @@ import org.apache.spark.util.{SizeEstimator, Utils}
private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
- case class Entry(value: Any, size: Long, deserialized: Boolean)
-
- private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true)
+ private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
@volatile private var currentMemory = 0L
// Object used to ensure that only one thread is putting blocks and if necessary, dropping
// blocks from the memory store.
private val putLock = new Object()
- logInfo("MemoryStore started with capacity %s.".format(Utils.bytesToString(maxMemory)))
+ logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
@@ -101,7 +101,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else if (entry.deserialized) {
Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
} else {
- Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
+ Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
}
}
@@ -124,8 +124,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = entries.remove(blockId)
if (entry != null) {
currentMemory -= entry.size
- logInfo("Block %s of size %d dropped from memory (free %d)".format(
- blockId, entry.size, freeMemory))
+ logInfo(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)")
true
} else {
false
@@ -181,18 +180,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
droppedBlocks ++= freeSpaceResult.droppedBlocks
if (enoughFreeSpace) {
- val entry = new Entry(value, size, deserialized)
+ val entry = new MemoryEntry(value, size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
}
- if (deserialized) {
- logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
- blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
- } else {
- logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
- blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
- }
+ val valuesOrBytes = if (deserialized) "values" else "bytes"
+ logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
+ blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
putSuccess = true
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
@@ -221,13 +216,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Return whether there is enough free space, along with the blocks dropped in the process.
*/
private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = {
- logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
- space, currentMemory, maxMemory))
+ logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
if (space > maxMemory) {
- logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
+ logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit")
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
@@ -252,7 +246,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
if (maxMemory - (currentMemory - selectedMemory) >= space) {
- logInfo(selectedBlocks.size + " blocks selected for dropping")
+ logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one thread should be dropping
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 2d8ff1194a..1e35abaab5 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -34,11 +34,11 @@ import org.apache.spark.annotation.DeveloperApi
*/
@DeveloperApi
class StorageLevel private(
- private var useDisk_ : Boolean,
- private var useMemory_ : Boolean,
- private var useOffHeap_ : Boolean,
- private var deserialized_ : Boolean,
- private var replication_ : Int = 1)
+ private var _useDisk: Boolean,
+ private var _useMemory: Boolean,
+ private var _useOffHeap: Boolean,
+ private var _deserialized: Boolean,
+ private var _replication: Int = 1)
extends Externalizable {
// TODO: Also add fields for caching priority, dataset ID, and flushing.
@@ -48,13 +48,13 @@ class StorageLevel private(
def this() = this(false, true, false, false) // For deserialization
- def useDisk = useDisk_
- def useMemory = useMemory_
- def useOffHeap = useOffHeap_
- def deserialized = deserialized_
- def replication = replication_
+ def useDisk = _useDisk
+ def useMemory = _useMemory
+ def useOffHeap = _useOffHeap
+ def deserialized = _deserialized
+ def replication = _replication
- assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
+ assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")
if (useOffHeap) {
require(!useDisk, "Off-heap storage level does not support using disk")
@@ -63,8 +63,9 @@ class StorageLevel private(
require(replication == 1, "Off-heap storage level does not support multiple replication")
}
- override def clone(): StorageLevel = new StorageLevel(
- this.useDisk, this.useMemory, this.useOffHeap, this.deserialized, this.replication)
+ override def clone(): StorageLevel = {
+ new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
+ }
override def equals(other: Any): Boolean = other match {
case s: StorageLevel =>
@@ -77,20 +78,20 @@ class StorageLevel private(
false
}
- def isValid = ((useMemory || useDisk || useOffHeap) && (replication > 0))
+ def isValid = (useMemory || useDisk || useOffHeap) && (replication > 0)
def toInt: Int = {
var ret = 0
- if (useDisk_) {
+ if (_useDisk) {
ret |= 8
}
- if (useMemory_) {
+ if (_useMemory) {
ret |= 4
}
- if (useOffHeap_) {
+ if (_useOffHeap) {
ret |= 2
}
- if (deserialized_) {
+ if (_deserialized) {
ret |= 1
}
ret
@@ -98,32 +99,34 @@ class StorageLevel private(
override def writeExternal(out: ObjectOutput) {
out.writeByte(toInt)
- out.writeByte(replication_)
+ out.writeByte(_replication)
}
override def readExternal(in: ObjectInput) {
val flags = in.readByte()
- useDisk_ = (flags & 8) != 0
- useMemory_ = (flags & 4) != 0
- useOffHeap_ = (flags & 2) != 0
- deserialized_ = (flags & 1) != 0
- replication_ = in.readByte()
+ _useDisk = (flags & 8) != 0
+ _useMemory = (flags & 4) != 0
+ _useOffHeap = (flags & 2) != 0
+ _deserialized = (flags & 1) != 0
+ _replication = in.readByte()
}
@throws(classOf[IOException])
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
- override def toString: String = "StorageLevel(%b, %b, %b, %b, %d)".format(
- useDisk, useMemory, useOffHeap, deserialized, replication)
+ override def toString: String = {
+ s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)"
+ }
override def hashCode(): Int = toInt * 41 + replication
- def description : String = {
+
+ def description: String = {
var result = ""
result += (if (useDisk) "Disk " else "")
result += (if (useMemory) "Memory " else "")
result += (if (useOffHeap) "Tachyon " else "")
result += (if (deserialized) "Deserialized " else "Serialized ")
- result += "%sx Replicated".format(replication)
+ result += s"${replication}x Replicated"
result
}
}
@@ -165,7 +168,7 @@ object StorageLevel {
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
case "OFF_HEAP" => OFF_HEAP
- case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s)
+ case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
}
/**
@@ -173,26 +176,37 @@ object StorageLevel {
* Create a new StorageLevel object without setting useOffHeap.
*/
@DeveloperApi
- def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
- deserialized: Boolean, replication: Int) = getCachedStorageLevel(
+ def apply(
+ useDisk: Boolean,
+ useMemory: Boolean,
+ useOffHeap: Boolean,
+ deserialized: Boolean,
+ replication: Int) = {
+ getCachedStorageLevel(
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
+ }
/**
* :: DeveloperApi ::
* Create a new StorageLevel object.
*/
@DeveloperApi
- def apply(useDisk: Boolean, useMemory: Boolean,
- deserialized: Boolean, replication: Int = 1) = getCachedStorageLevel(
- new StorageLevel(useDisk, useMemory, false, deserialized, replication))
+ def apply(
+ useDisk: Boolean,
+ useMemory: Boolean,
+ deserialized: Boolean,
+ replication: Int = 1) = {
+ getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, replication))
+ }
/**
* :: DeveloperApi ::
* Create a new StorageLevel object from its integer representation.
*/
@DeveloperApi
- def apply(flags: Int, replication: Int): StorageLevel =
+ def apply(flags: Int, replication: Int): StorageLevel = {
getCachedStorageLevel(new StorageLevel(flags, replication))
+ }
/**
* :: DeveloperApi ::
@@ -205,8 +219,8 @@ object StorageLevel {
getCachedStorageLevel(obj)
}
- private[spark]
- val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
+ private[spark] val storageLevelCache =
+ new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
storageLevelCache.putIfAbsent(level, level)
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index c37e76f893..d8ff4ff6bd 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -22,15 +22,10 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import tachyon.client.{WriteType, ReadType}
+import tachyon.client.{ReadType, WriteType}
import org.apache.spark.Logging
import org.apache.spark.util.Utils
-import org.apache.spark.serializer.Serializer
-
-
-private class Entry(val size: Long)
-
/**
* Stores BlockManager blocks on Tachyon.
@@ -46,8 +41,8 @@ private class TachyonStore(
tachyonManager.getFile(blockId.name).length
}
- override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
- putToTachyonStore(blockId, bytes, true)
+ override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
+ putIntoTachyonStore(blockId, bytes, returnValues = true)
}
override def putValues(
@@ -55,7 +50,7 @@ private class TachyonStore(
values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- return putValues(blockId, values.toIterator, level, returnValues)
+ putValues(blockId, values.toIterator, level, returnValues)
}
override def putValues(
@@ -63,12 +58,12 @@ private class TachyonStore(
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- logDebug("Attempting to write values for block " + blockId)
- val _bytes = blockManager.dataSerialize(blockId, values)
- putToTachyonStore(blockId, _bytes, returnValues)
+ logDebug(s"Attempting to write values for block $blockId")
+ val bytes = blockManager.dataSerialize(blockId, values)
+ putIntoTachyonStore(blockId, bytes, returnValues)
}
- private def putToTachyonStore(
+ private def putIntoTachyonStore(
blockId: BlockId,
bytes: ByteBuffer,
returnValues: Boolean): PutResult = {
@@ -76,7 +71,7 @@ private class TachyonStore(
// duplicate does not copy buffer, so inexpensive
val byteBuffer = bytes.duplicate()
byteBuffer.rewind()
- logDebug("Attempting to put block " + blockId + " into Tachyon")
+ logDebug(s"Attempting to put block $blockId into Tachyon")
val startTime = System.currentTimeMillis
val file = tachyonManager.getFile(blockId)
val os = file.getOutStream(WriteType.TRY_CACHE)
@@ -84,7 +79,7 @@ private class TachyonStore(
os.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file in Tachyon in %d ms".format(
- blockId, Utils.bytesToString(byteBuffer.limit), (finishTime - startTime)))
+ blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
if (returnValues) {
PutResult(bytes.limit(), Right(bytes.duplicate()))
@@ -106,10 +101,9 @@ private class TachyonStore(
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}
-
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val file = tachyonManager.getFile(blockId)
- if (file == null || file.getLocationHosts().size == 0) {
+ if (file == null || file.getLocationHosts.size == 0) {
return None
}
val is = file.getInStream(ReadType.CACHE)
@@ -121,16 +115,15 @@ private class TachyonStore(
val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
buffer = ByteBuffer.wrap(bs)
if (fetchSize != size) {
- logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size +
- " is not equal to fetched size " + fetchSize)
+ logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " +
+ s"is not equal to fetched size $fetchSize")
return None
}
}
} catch {
- case ioe: IOException => {
- logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe)
- return None
- }
+ case ioe: IOException =>
+ logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
+ return None
}
Some(buffer)
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index ee629794f6..042fdfcc47 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,6 +54,8 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.MemoryStore.Entry"),
+ ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
+ "createZero$1")
) ++
@@ -67,7 +69,10 @@ object MimaExcludes {
) ++
MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
- MimaBuild.excludeSparkClass("util.SerializableHyperLogLog")
+ MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
+ MimaBuild.excludeSparkClass("storage.Values") ++
+ MimaBuild.excludeSparkClass("storage.Entry") ++
+ MimaBuild.excludeSparkClass("storage.MemoryStore$Entry")
case v if v.startsWith("1.0") =>
Seq(
MimaBuild.excludeSparkPackage("api.java"),