aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala122
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala211
-rw-r--r--core/src/main/scala/org/apache/spark/storage/RDDInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala324
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala17
17 files changed, 47 insertions, 819 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 98075cef11..77acb7052d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -243,10 +243,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
- // Generate the random name for a temp folder in external block store.
- // Add a timestamp as the suffix here to make it more safe
- val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
-
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
/**
@@ -423,8 +419,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
- _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
-
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
index c115e0ff74..dad90fc220 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
-import org.apache.spark.{Logging, SparkEnv, SparkException, TaskContext}
+import org.apache.spark.{Logging, SparkEnv, TaskContext}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.Utils
@@ -72,12 +72,6 @@ private[spark] object LocalRDDCheckpointData {
* This method is idempotent.
*/
def transformStorageLevel(level: StorageLevel): StorageLevel = {
- // If this RDD is to be cached off-heap, fail fast since we cannot provide any
- // correctness guarantees about subsequent computations after the first one
- if (level.useOffHeap) {
- throw new SparkException("Local checkpointing is not compatible with off-heap caching.")
- }
-
StorageLevel(useDisk = true, level.useMemory, level.deserialized, level.replication)
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 9cd52d6c2b..fe372116f1 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -85,8 +85,6 @@ class JobData private[spark](
val numSkippedStages: Int,
val numFailedStages: Int)
-// Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage
-// page ... does anybody pay attention to it?
class RDDStorageInfo private[spark](
val id: Int,
val name: String,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4479e6875a..e49d79b8ad 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -83,13 +83,8 @@ private[spark] class BlockManager(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
// Actual storage of where blocks are kept
- private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, memoryManager)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
- private[spark] lazy val externalBlockStore: ExternalBlockStore = {
- externalBlockStoreInitialized = true
- new ExternalBlockStore(this, executorId)
- }
memoryManager.setMemoryStore(memoryStore)
// Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
@@ -313,8 +308,7 @@ private[spark] class BlockManager(
blockInfo.asScala.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
- // Assume that block is not in external block store
- BlockStatus(info.level, memSize, diskSize, 0L)
+ BlockStatus(info.level, memSize = memSize, diskSize = diskSize)
}
}
@@ -363,10 +357,8 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
- val inExternalBlockStoreSize = status.externalBlockStoreSize
val onDiskSize = status.diskSize
- master.updateBlockInfo(
- blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
} else {
true
}
@@ -381,20 +373,17 @@ private[spark] class BlockManager(
info.synchronized {
info.level match {
case null =>
- BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
+ BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
- val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
- val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
+ val replication = if (inMem || onDisk) level.replication else 1
val storageLevel =
- StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
+ StorageLevel(onDisk, inMem, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
- val externalBlockStoreSize =
- if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
- BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
+ BlockStatus(storageLevel, memSize, diskSize)
}
}
}
@@ -475,25 +464,6 @@ private[spark] class BlockManager(
}
}
- // Look for the block in external block store
- if (level.useOffHeap) {
- logDebug(s"Getting block $blockId from ExternalBlockStore")
- if (externalBlockStore.contains(blockId)) {
- val result = if (asBlockResult) {
- externalBlockStore.getValues(blockId)
- .map(new BlockResult(_, DataReadMethod.Memory, info.size))
- } else {
- externalBlockStore.getBytes(blockId)
- }
- result match {
- case Some(values) =>
- return result
- case None =>
- logDebug(s"Block $blockId not found in ExternalBlockStore")
- }
- }
- }
-
// Look for block on disk, potentially storing it back in memory if required
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
@@ -786,9 +756,6 @@ private[spark] class BlockManager(
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
- } else if (putLevel.useOffHeap) {
- // Use external block store
- (false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
@@ -909,8 +876,7 @@ private[spark] class BlockManager(
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
- val tLevel = StorageLevel(
- level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
+ val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)
@@ -1120,9 +1086,7 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
- val removedFromExternalBlockStore =
- if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
- if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
+ if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
@@ -1212,9 +1176,6 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
- if (externalBlockStoreInitialized) {
- externalBlockStore.clear()
- }
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index da1de11d60..0b7aa599e9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -54,11 +54,9 @@ class BlockManagerMaster(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long): Boolean = {
+ diskSize: Long): Boolean = {
val res = driverEndpoint.askWithRetry[Boolean](
- UpdateBlockInfo(blockManagerId, blockId, storageLevel,
- memSize, diskSize, externalBlockStoreSize))
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logDebug(s"Updated info of block $blockId")
res
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 4db400a344..fbb3df8c3e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -59,10 +59,9 @@ class BlockManagerMasterEndpoint(
register(blockManagerId, maxMemSize, slaveEndpoint)
context.reply(true)
- case _updateBlockInfo @ UpdateBlockInfo(
- blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
- context.reply(updateBlockInfo(
- blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
+ case _updateBlockInfo @
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
case GetLocations(blockId) =>
@@ -325,8 +324,7 @@ class BlockManagerMasterEndpoint(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long): Boolean = {
+ diskSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
@@ -343,8 +341,7 @@ class BlockManagerMasterEndpoint(
return true
}
- blockManagerInfo(blockManagerId).updateBlockInfo(
- blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
+ blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
@@ -404,17 +401,13 @@ class BlockManagerMasterEndpoint(
}
@DeveloperApi
-case class BlockStatus(
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long) {
- def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
+case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
+ def isCached: Boolean = memSize + diskSize > 0
}
@DeveloperApi
object BlockStatus {
- def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
+ def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
}
private[spark] class BlockManagerInfo(
@@ -443,8 +436,7 @@ private[spark] class BlockManagerInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long) {
+ diskSize: Long) {
updateLastSeenMs()
@@ -468,7 +460,7 @@ private[spark] class BlockManagerInfo(
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
var blockStatus: BlockStatus = null
if (storageLevel.useMemory) {
- blockStatus = BlockStatus(storageLevel, memSize, 0, 0)
+ blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
@@ -476,17 +468,11 @@ private[spark] class BlockManagerInfo(
Utils.bytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
- blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)
+ blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
logInfo("Added %s on disk on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
- if (storageLevel.useOffHeap) {
- blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)
- _blocks.put(blockId, blockStatus)
- logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
- }
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
@@ -504,11 +490,6 @@ private[spark] class BlockManagerInfo(
logInfo("Removed %s on %s on disk (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
- if (blockStatus.storageLevel.useOffHeap) {
- logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
- blockId, blockManagerId.hostPort,
- Utils.bytesToString(blockStatus.externalBlockStoreSize)))
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index f392a4a0cd..6bded92700 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -63,12 +63,11 @@ private[spark] object BlockManagerMessages {
var blockId: BlockId,
var storageLevel: StorageLevel,
var memSize: Long,
- var diskSize: Long,
- var externalBlockStoreSize: Long)
+ var diskSize: Long)
extends ToBlockManagerMaster
with Externalizable {
- def this() = this(null, null, null, 0, 0, 0) // For deserialization only
+ def this() = this(null, null, null, 0, 0) // For deserialization only
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
blockManagerId.writeExternal(out)
@@ -76,7 +75,6 @@ private[spark] object BlockManagerMessages {
storageLevel.writeExternal(out)
out.writeLong(memSize)
out.writeLong(diskSize)
- out.writeLong(externalBlockStoreSize)
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -85,7 +83,6 @@ private[spark] object BlockManagerMessages {
storageLevel = StorageLevel(in)
memSize = in.readLong()
diskSize = in.readLong()
- externalBlockStoreSize = in.readLong()
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
index 2789e25b8d..0a14fcadf5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
@@ -26,8 +26,7 @@ private[spark] case class BlockUIData(
location: String,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long)
+ diskSize: Long)
/**
* The aggregated status of stream blocks in an executor
@@ -41,8 +40,6 @@ private[spark] case class ExecutorStreamBlockStatus(
def totalDiskSize: Long = blocks.map(_.diskSize).sum
- def totalExternalBlockStoreSize: Long = blocks.map(_.externalBlockStoreSize).sum
-
def numStreamBlocks: Int = blocks.size
}
@@ -62,7 +59,6 @@ private[spark] class BlockStatusListener extends SparkListener {
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
- val externalBlockStoreSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize
synchronized {
// Drop the update info if the block manager is not registered
@@ -74,8 +70,7 @@ private[spark] class BlockStatusListener extends SparkListener {
blockManagerId.hostPort,
storageLevel,
memSize,
- diskSize,
- externalBlockStoreSize)
+ diskSize)
)
} else {
// If isValid is not true, it means we should drop the block.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
index a5790e4454..e070bf658a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
@@ -30,8 +30,7 @@ case class BlockUpdatedInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long)
+ diskSize: Long)
private[spark] object BlockUpdatedInfo {
@@ -41,7 +40,6 @@ private[spark] object BlockUpdatedInfo {
updateBlockInfo.blockId,
updateBlockInfo.storageLevel,
updateBlockInfo.memSize,
- updateBlockInfo.diskSize,
- updateBlockInfo.externalBlockStoreSize)
+ updateBlockInfo.diskSize)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
deleted file mode 100644
index f39325a12d..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-/**
- * An abstract class that the concrete external block manager has to inherit.
- * The class has to have a no-argument constructor, and will be initialized by init,
- * which is invoked by ExternalBlockStore. The main input parameter is blockId for all
- * the methods, which is the unique identifier for Block in one Spark application.
- *
- * The underlying external block manager should avoid any name space conflicts among multiple
- * Spark applications. For example, creating different directory for different applications
- * by randomUUID
- *
- */
-private[spark] abstract class ExternalBlockManager {
-
- protected var blockManager: BlockManager = _
-
- override def toString: String = {"External Block Store"}
-
- /**
- * Initialize a concrete block manager implementation. Subclass should initialize its internal
- * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore
- * right after the class is constructed. The function should throw IOException on failure
- *
- * @throws java.io.IOException if there is any file system failure during the initialization.
- */
- def init(blockManager: BlockManager, executorId: String): Unit = {
- this.blockManager = blockManager
- }
-
- /**
- * Drop the block from underlying external block store, if it exists..
- * @return true on successfully removing the block
- * false if the block could not be removed as it was not found
- *
- * @throws java.io.IOException if there is any file system failure in removing the block.
- */
- def removeBlock(blockId: BlockId): Boolean
-
- /**
- * Used by BlockManager to check the existence of the block in the underlying external
- * block store.
- * @return true if the block exists.
- * false if the block does not exists.
- *
- * @throws java.io.IOException if there is any file system failure in checking
- * the block existence.
- */
- def blockExists(blockId: BlockId): Boolean
-
- /**
- * Put the given block to the underlying external block store. Note that in normal case,
- * putting a block should never fail unless something wrong happens to the underlying
- * external block store, e.g., file system failure, etc. In this case, IOException
- * should be thrown.
- *
- * @throws java.io.IOException if there is any file system failure in putting the block.
- */
- def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
-
- def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
- val bytes = blockManager.dataSerialize(blockId, values)
- putBytes(blockId, bytes)
- }
-
- /**
- * Retrieve the block bytes.
- * @return Some(ByteBuffer) if the block bytes is successfully retrieved
- * None if the block does not exist in the external block store.
- *
- * @throws java.io.IOException if there is any file system failure in getting the block.
- */
- def getBytes(blockId: BlockId): Option[ByteBuffer]
-
- /**
- * Retrieve the block data.
- * @return Some(Iterator[Any]) if the block data is successfully retrieved
- * None if the block does not exist in the external block store.
- *
- * @throws java.io.IOException if there is any file system failure in getting the block.
- */
- def getValues(blockId: BlockId): Option[Iterator[_]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
- }
-
- /**
- * Get the size of the block saved in the underlying external block store,
- * which is saved before by putBytes.
- * @return size of the block
- * 0 if the block does not exist
- *
- * @throws java.io.IOException if there is any file system failure in getting the block size.
- */
- def getSize(blockId: BlockId): Long
-
- /**
- * Clean up any information persisted in the underlying external block store,
- * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore
- * during system shutdown.
- *
- */
- def shutdown()
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
deleted file mode 100644
index 94883a54a7..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import scala.util.control.NonFatal
-
-import org.apache.spark.Logging
-import org.apache.spark.util.{ShutdownHookManager, Utils}
-
-
-/**
- * Stores BlockManager blocks on ExternalBlockStore.
- * We capture any potential exception from underlying implementation
- * and return with the expected failure value
- */
-private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: String)
- extends BlockStore(blockManager: BlockManager) with Logging {
-
- lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager()
-
- logInfo("ExternalBlockStore started")
-
- override def getSize(blockId: BlockId): Long = {
- try {
- externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
- } catch {
- case NonFatal(t) =>
- logError(s"Error in getSize($blockId)", t)
- 0L
- }
- }
-
- override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
- putIntoExternalBlockStore(blockId, bytes, returnValues = true)
- }
-
- override def putArray(
- blockId: BlockId,
- values: Array[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
- }
-
- override def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIntoExternalBlockStore(blockId, values, returnValues)
- }
-
- private def putIntoExternalBlockStore(
- blockId: BlockId,
- values: Iterator[_],
- returnValues: Boolean): PutResult = {
- logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
- // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
- try {
- val startTime = System.currentTimeMillis
- if (externalBlockManager.isDefined) {
- externalBlockManager.get.putValues(blockId, values)
- val size = getSize(blockId)
- val data = if (returnValues) {
- Left(getValues(blockId).get)
- } else {
- null
- }
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
- blockId, Utils.bytesToString(size), finishTime - startTime))
- PutResult(size, data)
- } else {
- logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured")
- PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
- }
- } catch {
- case NonFatal(t) =>
- logError(s"Error in putValues($blockId)", t)
- PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
- }
- }
-
- private def putIntoExternalBlockStore(
- blockId: BlockId,
- bytes: ByteBuffer,
- returnValues: Boolean): PutResult = {
- logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
- // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
- try {
- val startTime = System.currentTimeMillis
- if (externalBlockManager.isDefined) {
- val byteBuffer = bytes.duplicate()
- byteBuffer.rewind()
- externalBlockManager.get.putBytes(blockId, byteBuffer)
- val size = bytes.limit()
- val data = if (returnValues) {
- Right(bytes)
- } else {
- null
- }
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
- blockId, Utils.bytesToString(size), finishTime - startTime))
- PutResult(size, data)
- } else {
- logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured")
- PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
- }
- } catch {
- case NonFatal(t) =>
- logError(s"Error in putBytes($blockId)", t)
- PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
- }
- }
-
- // We assume the block is removed even if exception thrown
- override def remove(blockId: BlockId): Boolean = {
- try {
- externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
- } catch {
- case NonFatal(t) =>
- logError(s"Error in removeBlock($blockId)", t)
- true
- }
- }
-
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- try {
- externalBlockManager.flatMap(_.getValues(blockId))
- } catch {
- case NonFatal(t) =>
- logError(s"Error in getValues($blockId)", t)
- None
- }
- }
-
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- try {
- externalBlockManager.flatMap(_.getBytes(blockId))
- } catch {
- case NonFatal(t) =>
- logError(s"Error in getBytes($blockId)", t)
- None
- }
- }
-
- override def contains(blockId: BlockId): Boolean = {
- try {
- val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
- if (!ret) {
- logInfo(s"Remove block $blockId")
- blockManager.removeBlock(blockId, true)
- }
- ret
- } catch {
- case NonFatal(t) =>
- logError(s"Error in getBytes($blockId)", t)
- false
- }
- }
-
- // Create concrete block manager and fall back to Tachyon by default for backward compatibility.
- private def createBlkManager(): Option[ExternalBlockManager] = {
- val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME)
- .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
-
- try {
- val instance = Utils.classForName(clsName)
- .newInstance()
- .asInstanceOf[ExternalBlockManager]
- instance.init(blockManager, executorId)
- ShutdownHookManager.addShutdownHook { () =>
- logDebug("Shutdown hook called")
- externalBlockManager.map(_.shutdown())
- }
- Some(instance)
- } catch {
- case NonFatal(t) =>
- logError("Cannot initialize external block store", t)
- None
- }
- }
-}
-
-private[spark] object ExternalBlockStore extends Logging {
- val MAX_DIR_CREATION_ATTEMPTS = 10
- val SUB_DIRS_PER_DIR = "64"
- val BASE_DIR = "spark.externalBlockStore.baseDir"
- val FOLD_NAME = "spark.externalBlockStore.folderName"
- val MASTER_URL = "spark.externalBlockStore.url"
- val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager"
- val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.TachyonBlockManager"
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 673f7ad79d..083d78b59e 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, RDDOperationScope}
-import org.apache.spark.util.{CallSite, Utils}
+import org.apache.spark.util.Utils
@DeveloperApi
class RDDInfo(
@@ -37,15 +37,14 @@ class RDDInfo(
var diskSize = 0L
var externalBlockStoreSize = 0L
- def isCached: Boolean =
- (memSize + diskSize + externalBlockStoreSize > 0) && numCachedPartitions > 0
+ def isCached: Boolean = (memSize + diskSize > 0) && numCachedPartitions > 0
override def toString: String = {
import Utils.bytesToString
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
- "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s").format(
+ "MemorySize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
- bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize))
+ bytesToString(memSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo): Int = {
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 703bce3e6b..38e9534251 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -150,7 +150,9 @@ object StorageLevel {
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
- val OFF_HEAP = new StorageLevel(false, false, true, false)
+
+ // Redirect to MEMORY_ONLY_SER for now.
+ val OFF_HEAP = MEMORY_ONLY_SER
/**
* :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index c4ac30092f..8e2cfb2441 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -48,14 +48,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
* non-RDD blocks for the same reason. In particular, RDD storage information is stored
* in a map indexed by the RDD ID to the following 4-tuple:
*
- * (memory size, disk size, off-heap size, storage level)
+ * (memory size, disk size, storage level)
*
* We assume that all the blocks that belong to the same RDD have the same storage level.
* This field is not relevant to non-RDD blocks, however, so the storage information for
* non-RDD blocks contains only the first 3 fields (in the same order).
*/
- private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, Long, StorageLevel)]
- private var _nonRddStorageInfo: (Long, Long, Long) = (0L, 0L, 0L)
+ private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)]
+ private var _nonRddStorageInfo: (Long, Long) = (0L, 0L)
/** Create a storage status with an initial set of blocks, leaving the source unmodified. */
def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
@@ -177,20 +177,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
/** Return the disk space used by this block manager. */
def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
- /** Return the off-heap space used by this block manager. */
- def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
-
/** Return the memory used by the given RDD in this block manager in O(1) time. */
def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
/** Return the disk space used by the given RDD in this block manager in O(1) time. */
def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
- /** Return the off-heap space used by the given RDD in this block manager in O(1) time. */
- def offHeapUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L)
-
/** Return the storage level, if any, used by the given RDD in this block manager. */
- def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._4)
+ def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3)
/**
* Update the relevant storage info, taking into account any existing status for this block.
@@ -199,34 +193,31 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize
val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize
- val changeInExternalBlockStore =
- newBlockStatus.externalBlockStoreSize - oldBlockStatus.externalBlockStoreSize
val level = newBlockStatus.storageLevel
// Compute new info from old info
- val (oldMem, oldDisk, oldExternalBlockStore) = blockId match {
+ val (oldMem, oldDisk) = blockId match {
case RDDBlockId(rddId, _) =>
_rddStorageInfo.get(rddId)
- .map { case (mem, disk, externalBlockStore, _) => (mem, disk, externalBlockStore) }
- .getOrElse((0L, 0L, 0L))
+ .map { case (mem, disk, _) => (mem, disk) }
+ .getOrElse((0L, 0L))
case _ =>
_nonRddStorageInfo
}
val newMem = math.max(oldMem + changeInMem, 0L)
val newDisk = math.max(oldDisk + changeInDisk, 0L)
- val newExternalBlockStore = math.max(oldExternalBlockStore + changeInExternalBlockStore, 0L)
// Set the correct info
blockId match {
case RDDBlockId(rddId, _) =>
// If this RDD is no longer persisted, remove it
- if (newMem + newDisk + newExternalBlockStore == 0) {
+ if (newMem + newDisk == 0) {
_rddStorageInfo.remove(rddId)
} else {
- _rddStorageInfo(rddId) = (newMem, newDisk, newExternalBlockStore, level)
+ _rddStorageInfo(rddId) = (newMem, newDisk, level)
}
case _ =>
- _nonRddStorageInfo = (newMem, newDisk, newExternalBlockStore)
+ _nonRddStorageInfo = (newMem, newDisk)
}
}
@@ -248,13 +239,11 @@ private[spark] object StorageUtils {
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
- val externalBlockStoreSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
rddInfo.storageLevel = storageLevel
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
rddInfo.diskSize = diskSize
- rddInfo.externalBlockStoreSize = externalBlockStoreSize
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
deleted file mode 100644
index 6aa7e13901..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.io.IOException
-import java.nio.ByteBuffer
-import java.text.SimpleDateFormat
-import java.util.{Date, Random}
-
-import scala.util.control.NonFatal
-
-import com.google.common.io.ByteStreams
-import tachyon.{Constants, TachyonURI}
-import tachyon.client.ClientContext
-import tachyon.client.file.{TachyonFile, TachyonFileSystem}
-import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory
-import tachyon.client.file.options.DeleteOptions
-import tachyon.conf.TachyonConf
-import tachyon.exception.{FileAlreadyExistsException, FileDoesNotExistException}
-
-import org.apache.spark.Logging
-import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
-
-/**
- * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By
- * default, one block is mapped to one file with a name given by its BlockId.
- *
- */
-private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
-
- var rootDirs: String = _
- var master: String = _
- var client: TachyonFileSystem = _
- private var subDirsPerTachyonDir: Int = _
-
- // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
- // then, inside this directory, create multiple subdirectories that we will hash files into,
- // in order to avoid having really large inodes at the top level in Tachyon.
- private var tachyonDirs: Array[TachyonFile] = _
- private var subDirs: Array[Array[TachyonFile]] = _
- private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
-
- override def init(blockManager: BlockManager, executorId: String): Unit = {
- super.init(blockManager, executorId)
- val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
- val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
-
- rootDirs = s"$storeDir/$appFolderName/$executorId"
- master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
- client = if (master != null && master != "") {
- val tachyonConf = new TachyonConf()
- tachyonConf.set(Constants.MASTER_ADDRESS, master)
- ClientContext.reset(tachyonConf)
- TachyonFileSystemFactory.get
- } else {
- null
- }
- // original implementation call System.exit, we change it to run without extblkstore support
- if (client == null) {
- logError("Failed to connect to the Tachyon as the master address is not configured")
- throw new IOException("Failed to connect to the Tachyon as the master " +
- "address is not configured")
- }
- subDirsPerTachyonDir = blockManager.conf.get("spark.externalBlockStore.subDirectories",
- ExternalBlockStore.SUB_DIRS_PER_DIR).toInt
-
- // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
- // then, inside this directory, create multiple subdirectories that we will hash files into,
- // in order to avoid having really large inodes at the top level in Tachyon.
- tachyonDirs = createTachyonDirs()
- subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
- tachyonDirs.foreach(registerShutdownDeleteDir)
- }
-
- override def toString: String = {"ExternalBlockStore-Tachyon"}
-
- override def removeBlock(blockId: BlockId): Boolean = {
- val file = getFile(blockId)
- if (fileExists(file)) {
- removeFile(file)
- true
- } else {
- false
- }
- }
-
- override def blockExists(blockId: BlockId): Boolean = {
- val file = getFile(blockId)
- fileExists(file)
- }
-
- override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
- val file = getFile(blockId)
- val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
- try {
- Utils.writeByteBuffer(bytes, os)
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to put bytes of block $blockId into Tachyon", e)
- os.cancel()
- } finally {
- os.close()
- }
- }
-
- override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
- val file = getFile(blockId)
- val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
- try {
- blockManager.dataSerializeStream(blockId, os, values)
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to put values of block $blockId into Tachyon", e)
- os.cancel()
- } finally {
- os.close()
- }
- }
-
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- val file = getFile(blockId)
- if (file == null) {
- return None
- }
- val is = try {
- client.getInStream(file)
- } catch {
- case _: FileDoesNotExistException =>
- return None
- }
- try {
- val size = client.getInfo(file).length
- val bs = new Array[Byte](size.asInstanceOf[Int])
- ByteStreams.readFully(is, bs)
- Some(ByteBuffer.wrap(bs))
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to get bytes of block $blockId from Tachyon", e)
- None
- } finally {
- is.close()
- }
- }
-
- override def getValues(blockId: BlockId): Option[Iterator[_]] = {
- val file = getFile(blockId)
- if (file == null) {
- return None
- }
- val is = try {
- client.getInStream(file)
- } catch {
- case _: FileDoesNotExistException =>
- return None
- }
- try {
- Some(blockManager.dataDeserializeStream(blockId, is))
- } finally {
- is.close()
- }
- }
-
- override def getSize(blockId: BlockId): Long = {
- client.getInfo(getFile(blockId.name)).length
- }
-
- def removeFile(file: TachyonFile): Unit = {
- client.delete(file)
- }
-
- def fileExists(file: TachyonFile): Boolean = {
- try {
- client.getInfo(file)
- true
- } catch {
- case _: FileDoesNotExistException => false
- }
- }
-
- def getFile(filename: String): TachyonFile = {
- // Figure out which tachyon directory it hashes to, and which subdirectory in that
- val hash = Utils.nonNegativeHash(filename)
- val dirId = hash % tachyonDirs.length
- val subDirId = (hash / tachyonDirs.length) % subDirsPerTachyonDir
-
- // Create the subdirectory if it doesn't already exist
- var subDir = subDirs(dirId)(subDirId)
- if (subDir == null) {
- subDir = subDirs(dirId).synchronized {
- val old = subDirs(dirId)(subDirId)
- if (old != null) {
- old
- } else {
- val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
- client.mkdir(path)
- val newDir = client.loadMetadata(path)
- subDirs(dirId)(subDirId) = newDir
- newDir
- }
- }
- }
- val filePath = new TachyonURI(s"$subDir/$filename")
- try {
- client.create(filePath)
- } catch {
- case _: FileAlreadyExistsException => client.loadMetadata(filePath)
- }
- }
-
- def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name)
-
- // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
- private def createTachyonDirs(): Array[TachyonFile] = {
- logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
- val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
- rootDirs.split(",").map { rootDir =>
- var foundLocalDir = false
- var tachyonDir: TachyonFile = null
- var tachyonDirId: String = null
- var tries = 0
- val rand = new Random()
- while (!foundLocalDir && tries < ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS) {
- tries += 1
- try {
- tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
- val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
- try {
- foundLocalDir = client.mkdir(path)
- tachyonDir = client.loadMetadata(path)
- } catch {
- case _: FileAlreadyExistsException => // continue
- }
- } catch {
- case NonFatal(e) =>
- logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
- }
- }
- if (!foundLocalDir) {
- logError("Failed " + ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS
- + " attempts to create tachyon dir in " + rootDir)
- System.exit(ExecutorExitCode.EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR)
- }
- logInfo("Created tachyon directory at " + tachyonDir)
- tachyonDir
- }
- }
-
- override def shutdown() {
- logDebug("Shutdown hook called")
- tachyonDirs.foreach { tachyonDir =>
- try {
- if (!hasRootAsShutdownDeleteDir(tachyonDir)) {
- deleteRecursively(tachyonDir, client)
- }
- } catch {
- case NonFatal(e) =>
- logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
- }
- }
- }
-
- /**
- * Delete a file or directory and its contents recursively.
- */
- private def deleteRecursively(dir: TachyonFile, client: TachyonFileSystem) {
- client.delete(dir, new DeleteOptions.Builder(ClientContext.getConf).setRecursive(true).build())
- }
-
- // Register the tachyon path to be deleted via shutdown hook
- private def registerShutdownDeleteDir(file: TachyonFile) {
- val absolutePath = client.getInfo(file).getPath
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths += absolutePath
- }
- }
-
- // Remove the tachyon path to be deleted via shutdown hook
- private def removeShutdownDeleteDir(file: TachyonFile) {
- val absolutePath = client.getInfo(file).getPath
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths -= absolutePath
- }
- }
-
- // Is the path already registered to be deleted via a shutdown hook ?
- private def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
- val absolutePath = client.getInfo(file).getPath
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths.contains(absolutePath)
- }
- }
-
- // Note: if file is child of some registered path, while not equal to it, then return true;
- // else false. This is to ensure that two shutdown hooks do not try to delete each others
- // paths - resulting in Exception and incomplete cleanup.
- private def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
- val absolutePath = client.getInfo(file).getPath
- val hasRoot = shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths.exists(
- path => !absolutePath.equals(path) && absolutePath.startsWith(path))
- }
- if (hasRoot) {
- logInfo(s"path = $absolutePath, already present as root for deletion.")
- }
- hasRoot
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 04f584621e..c9bb49b83e 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -54,7 +54,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
"Cached Partitions",
"Fraction Cached",
"Size in Memory",
- "Size in ExternalBlockStore",
"Size on Disk")
/** Render an HTML row representing an RDD */
@@ -71,7 +70,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
<td>{rdd.numCachedPartitions.toString}</td>
<td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
<td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
- <td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td>
<td sorttable_customkey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td>
</tr>
// scalastyle:on
@@ -104,7 +102,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
"Executor ID",
"Address",
"Total Size in Memory",
- "Total Size in ExternalBlockStore",
"Total Size on Disk",
"Stream Blocks")
@@ -119,9 +116,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
<td sorttable_customkey={status.totalMemSize.toString}>
{Utils.bytesToString(status.totalMemSize)}
</td>
- <td sorttable_customkey={status.totalExternalBlockStoreSize.toString}>
- {Utils.bytesToString(status.totalExternalBlockStoreSize)}
- </td>
<td sorttable_customkey={status.totalDiskSize.toString}>
{Utils.bytesToString(status.totalDiskSize)}
</td>
@@ -195,8 +189,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
("Memory", block.memSize)
} else if (block.storageLevel.useMemory && !block.storageLevel.deserialized) {
("Memory Serialized", block.memSize)
- } else if (block.storageLevel.useOffHeap) {
- ("External", block.externalBlockStoreSize)
} else {
throw new IllegalStateException(s"Invalid Storage Level: ${block.storageLevel}")
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a62fd2f339..a6460bc8b8 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -409,14 +409,12 @@ private[spark] object JsonProtocol {
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
- ("ExternalBlockStore Size" -> rddInfo.externalBlockStoreSize) ~
("Disk Size" -> rddInfo.diskSize)
}
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
- ("Use ExternalBlockStore" -> storageLevel.useOffHeap) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
@@ -425,7 +423,6 @@ private[spark] object JsonProtocol {
val storageLevel = storageLevelToJson(blockStatus.storageLevel)
("Storage Level" -> storageLevel) ~
("Memory Size" -> blockStatus.memSize) ~
- ("ExternalBlockStore Size" -> blockStatus.externalBlockStoreSize) ~
("Disk Size" -> blockStatus.diskSize)
}
@@ -867,15 +864,11 @@ private[spark] object JsonProtocol {
val numPartitions = (json \ "Number of Partitions").extract[Int]
val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
val memSize = (json \ "Memory Size").extract[Long]
- // fallback to tachyon for backward compatibility
- val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
- .getOrElse(json \ "Tachyon Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope)
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
- rddInfo.externalBlockStoreSize = externalBlockStoreSize
rddInfo.diskSize = diskSize
rddInfo
}
@@ -883,22 +876,16 @@ private[spark] object JsonProtocol {
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
- // fallback to tachyon for backward compatability
- val useExternalBlockStore = (json \ "Use ExternalBlockStore").toSome
- .getOrElse(json \ "Use Tachyon").extract[Boolean]
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
- StorageLevel(useDisk, useMemory, useExternalBlockStore, deserialized, replication)
+ StorageLevel(useDisk, useMemory, deserialized, replication)
}
def blockStatusFromJson(json: JValue): BlockStatus = {
val storageLevel = storageLevelFromJson(json \ "Storage Level")
val memorySize = (json \ "Memory Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
- // fallback to tachyon for backward compatability
- val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
- .getOrElse(json \ "Tachyon Size").extract[Long]
- BlockStatus(storageLevel, memorySize, diskSize, externalBlockStoreSize)
+ BlockStatus(storageLevel, memorySize, diskSize)
}
def executorInfoFromJson(json: JValue): ExecutorInfo = {