diff options
author | Reynold Xin <rxin@databricks.com> | 2016-01-15 12:03:28 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-01-15 12:03:28 -0800 |
commit | ad1503f92e1f6e960a24f9f5d36b1735d1f5073a (patch) | |
tree | 564c1b3b8bea707d98d9467a7bbba87845051908 /core/src/main | |
parent | 5f83c6991c95616ecbc2878f8860c69b2826f56c (diff) | |
download | spark-ad1503f92e1f6e960a24f9f5d36b1735d1f5073a.tar.gz spark-ad1503f92e1f6e960a24f9f5d36b1735d1f5073a.tar.bz2 spark-ad1503f92e1f6e960a24f9f5d36b1735d1f5073a.zip |
[SPARK-12667] Remove block manager's internal "external block store" API
This pull request removes the external block store API. This is rarely used, and the file system interface is actually a better, more standard way to interact with external storage systems.
There are some other things to remove also, as pointed out by JoshRosen. We will do those as follow-up pull requests.
Author: Reynold Xin <rxin@databricks.com>
Closes #10752 from rxin/remove-offheap.
Diffstat (limited to 'core/src/main')
17 files changed, 47 insertions, 819 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 98075cef11..77acb7052d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -243,10 +243,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] def eventLogDir: Option[URI] = _eventLogDir private[spark] def eventLogCodec: Option[String] = _eventLogCodec - // Generate the random name for a temp folder in external block store. - // Add a timestamp as the suffix here to make it more safe - val externalBlockStoreFolderName = "spark-" + randomUUID.toString() - def isLocal: Boolean = (master == "local" || master.startsWith("local[")) /** @@ -423,8 +419,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName) - if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") // "_jobProgressListener" should be set up before creating SparkEnv because when creating diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala index c115e0ff74..dad90fc220 100644 --- a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala @@ -19,7 +19,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Logging, SparkEnv, SparkException, TaskContext} +import org.apache.spark.{Logging, SparkEnv, TaskContext} import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.Utils @@ -72,12 +72,6 @@ private[spark] object LocalRDDCheckpointData { * This method is idempotent. */ def transformStorageLevel(level: StorageLevel): StorageLevel = { - // If this RDD is to be cached off-heap, fail fast since we cannot provide any - // correctness guarantees about subsequent computations after the first one - if (level.useOffHeap) { - throw new SparkException("Local checkpointing is not compatible with off-heap caching.") - } - StorageLevel(useDisk = true, level.useMemory, level.deserialized, level.replication) } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 9cd52d6c2b..fe372116f1 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -85,8 +85,6 @@ class JobData private[spark]( val numSkippedStages: Int, val numFailedStages: Int) -// Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage -// page ... does anybody pay attention to it? class RDDStorageInfo private[spark]( val id: Int, val name: String, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4479e6875a..e49d79b8ad 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -83,13 +83,8 @@ private[spark] class BlockManager( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) // Actual storage of where blocks are kept - private var externalBlockStoreInitialized = false private[spark] val memoryStore = new MemoryStore(this, memoryManager) private[spark] val diskStore = new DiskStore(this, diskBlockManager) - private[spark] lazy val externalBlockStore: ExternalBlockStore = { - externalBlockStoreInitialized = true - new ExternalBlockStore(this, executorId) - } memoryManager.setMemoryStore(memoryStore) // Note: depending on the memory manager, `maxStorageMemory` may actually vary over time. @@ -313,8 +308,7 @@ private[spark] class BlockManager( blockInfo.asScala.get(blockId).map { info => val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L - // Assume that block is not in external block store - BlockStatus(info.level, memSize, diskSize, 0L) + BlockStatus(info.level, memSize = memSize, diskSize = diskSize) } } @@ -363,10 +357,8 @@ private[spark] class BlockManager( if (info.tellMaster) { val storageLevel = status.storageLevel val inMemSize = Math.max(status.memSize, droppedMemorySize) - val inExternalBlockStoreSize = status.externalBlockStoreSize val onDiskSize = status.diskSize - master.updateBlockInfo( - blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize) + master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) } else { true } @@ -381,20 +373,17 @@ private[spark] class BlockManager( info.synchronized { info.level match { case null => - BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) + BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) - val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false - val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1 + val replication = if (inMem || onDisk) level.replication else 1 val storageLevel = - StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication) + StorageLevel(onDisk, inMem, deserialized, replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L - val externalBlockStoreSize = - if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L - BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize) + BlockStatus(storageLevel, memSize, diskSize) } } } @@ -475,25 +464,6 @@ private[spark] class BlockManager( } } - // Look for the block in external block store - if (level.useOffHeap) { - logDebug(s"Getting block $blockId from ExternalBlockStore") - if (externalBlockStore.contains(blockId)) { - val result = if (asBlockResult) { - externalBlockStore.getValues(blockId) - .map(new BlockResult(_, DataReadMethod.Memory, info.size)) - } else { - externalBlockStore.getBytes(blockId) - } - result match { - case Some(values) => - return result - case None => - logDebug(s"Block $blockId not found in ExternalBlockStore") - } - } - } - // Look for block on disk, potentially storing it back in memory if required if (level.useDisk) { logDebug(s"Getting block $blockId from disk") @@ -786,9 +756,6 @@ private[spark] class BlockManager( // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. (true, memoryStore) - } else if (putLevel.useOffHeap) { - // Use external block store - (false, externalBlockStore) } else if (putLevel.useDisk) { // Don't get back the bytes from put unless we replicate them (putLevel.replication > 1, diskStore) @@ -909,8 +876,7 @@ private[spark] class BlockManager( val peersForReplication = new ArrayBuffer[BlockManagerId] val peersReplicatedTo = new ArrayBuffer[BlockManagerId] val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] - val tLevel = StorageLevel( - level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) + val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) val startTime = System.currentTimeMillis val random = new Random(blockId.hashCode) @@ -1120,9 +1086,7 @@ private[spark] class BlockManager( // Removals are idempotent in disk store and memory store. At worst, we get a warning. val removedFromMemory = memoryStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId) - val removedFromExternalBlockStore = - if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false - if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) { + if (!removedFromMemory && !removedFromDisk) { logWarning(s"Block $blockId could not be removed as it was not found in either " + "the disk, memory, or external block store") } @@ -1212,9 +1176,6 @@ private[spark] class BlockManager( blockInfo.clear() memoryStore.clear() diskStore.clear() - if (externalBlockStoreInitialized) { - externalBlockStore.clear() - } futureExecutionContext.shutdownNow() logInfo("BlockManager stopped") } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index da1de11d60..0b7aa599e9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -54,11 +54,9 @@ class BlockManagerMaster( blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long, - externalBlockStoreSize: Long): Boolean = { + diskSize: Long): Boolean = { val res = driverEndpoint.askWithRetry[Boolean]( - UpdateBlockInfo(blockManagerId, blockId, storageLevel, - memSize, diskSize, externalBlockStoreSize)) + UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) logDebug(s"Updated info of block $blockId") res } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 4db400a344..fbb3df8c3e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -59,10 +59,9 @@ class BlockManagerMasterEndpoint( register(blockManagerId, maxMemSize, slaveEndpoint) context.reply(true) - case _updateBlockInfo @ UpdateBlockInfo( - blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) => - context.reply(updateBlockInfo( - blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize)) + case _updateBlockInfo @ + UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => + context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)) listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) case GetLocations(blockId) => @@ -325,8 +324,7 @@ class BlockManagerMasterEndpoint( blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long, - externalBlockStoreSize: Long): Boolean = { + diskSize: Long): Boolean = { if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { @@ -343,8 +341,7 @@ class BlockManagerMasterEndpoint( return true } - blockManagerInfo(blockManagerId).updateBlockInfo( - blockId, storageLevel, memSize, diskSize, externalBlockStoreSize) + blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) var locations: mutable.HashSet[BlockManagerId] = null if (blockLocations.containsKey(blockId)) { @@ -404,17 +401,13 @@ class BlockManagerMasterEndpoint( } @DeveloperApi -case class BlockStatus( - storageLevel: StorageLevel, - memSize: Long, - diskSize: Long, - externalBlockStoreSize: Long) { - def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0 +case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) { + def isCached: Boolean = memSize + diskSize > 0 } @DeveloperApi object BlockStatus { - def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) + def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L) } private[spark] class BlockManagerInfo( @@ -443,8 +436,7 @@ private[spark] class BlockManagerInfo( blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long, - externalBlockStoreSize: Long) { + diskSize: Long) { updateLastSeenMs() @@ -468,7 +460,7 @@ private[spark] class BlockManagerInfo( * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */ var blockStatus: BlockStatus = null if (storageLevel.useMemory) { - blockStatus = BlockStatus(storageLevel, memSize, 0, 0) + blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0) _blocks.put(blockId, blockStatus) _remainingMem -= memSize logInfo("Added %s in memory on %s (size: %s, free: %s)".format( @@ -476,17 +468,11 @@ private[spark] class BlockManagerInfo( Utils.bytesToString(_remainingMem))) } if (storageLevel.useDisk) { - blockStatus = BlockStatus(storageLevel, 0, diskSize, 0) + blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize) _blocks.put(blockId, blockStatus) logInfo("Added %s on disk on %s (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) } - if (storageLevel.useOffHeap) { - blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize) - _blocks.put(blockId, blockStatus) - logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize))) - } if (!blockId.isBroadcast && blockStatus.isCached) { _cachedBlocks += blockId } @@ -504,11 +490,6 @@ private[spark] class BlockManagerInfo( logInfo("Removed %s on %s on disk (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize))) } - if (blockStatus.storageLevel.useOffHeap) { - logInfo("Removed %s on %s on externalBlockStore (size: %s)".format( - blockId, blockManagerId.hostPort, - Utils.bytesToString(blockStatus.externalBlockStoreSize))) - } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index f392a4a0cd..6bded92700 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -63,12 +63,11 @@ private[spark] object BlockManagerMessages { var blockId: BlockId, var storageLevel: StorageLevel, var memSize: Long, - var diskSize: Long, - var externalBlockStoreSize: Long) + var diskSize: Long) extends ToBlockManagerMaster with Externalizable { - def this() = this(null, null, null, 0, 0, 0) // For deserialization only + def this() = this(null, null, null, 0, 0) // For deserialization only override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { blockManagerId.writeExternal(out) @@ -76,7 +75,6 @@ private[spark] object BlockManagerMessages { storageLevel.writeExternal(out) out.writeLong(memSize) out.writeLong(diskSize) - out.writeLong(externalBlockStoreSize) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { @@ -85,7 +83,6 @@ private[spark] object BlockManagerMessages { storageLevel = StorageLevel(in) memSize = in.readLong() diskSize = in.readLong() - externalBlockStoreSize = in.readLong() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala index 2789e25b8d..0a14fcadf5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala @@ -26,8 +26,7 @@ private[spark] case class BlockUIData( location: String, storageLevel: StorageLevel, memSize: Long, - diskSize: Long, - externalBlockStoreSize: Long) + diskSize: Long) /** * The aggregated status of stream blocks in an executor @@ -41,8 +40,6 @@ private[spark] case class ExecutorStreamBlockStatus( def totalDiskSize: Long = blocks.map(_.diskSize).sum - def totalExternalBlockStoreSize: Long = blocks.map(_.externalBlockStoreSize).sum - def numStreamBlocks: Int = blocks.size } @@ -62,7 +59,6 @@ private[spark] class BlockStatusListener extends SparkListener { val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel val memSize = blockUpdated.blockUpdatedInfo.memSize val diskSize = blockUpdated.blockUpdatedInfo.diskSize - val externalBlockStoreSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize synchronized { // Drop the update info if the block manager is not registered @@ -74,8 +70,7 @@ private[spark] class BlockStatusListener extends SparkListener { blockManagerId.hostPort, storageLevel, memSize, - diskSize, - externalBlockStoreSize) + diskSize) ) } else { // If isValid is not true, it means we should drop the block. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala index a5790e4454..e070bf658a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala @@ -30,8 +30,7 @@ case class BlockUpdatedInfo( blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long, - externalBlockStoreSize: Long) + diskSize: Long) private[spark] object BlockUpdatedInfo { @@ -41,7 +40,6 @@ private[spark] object BlockUpdatedInfo { updateBlockInfo.blockId, updateBlockInfo.storageLevel, updateBlockInfo.memSize, - updateBlockInfo.diskSize, - updateBlockInfo.externalBlockStoreSize) + updateBlockInfo.diskSize) } } diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala deleted file mode 100644 index f39325a12d..0000000000 --- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import java.nio.ByteBuffer - -/** - * An abstract class that the concrete external block manager has to inherit. - * The class has to have a no-argument constructor, and will be initialized by init, - * which is invoked by ExternalBlockStore. The main input parameter is blockId for all - * the methods, which is the unique identifier for Block in one Spark application. - * - * The underlying external block manager should avoid any name space conflicts among multiple - * Spark applications. For example, creating different directory for different applications - * by randomUUID - * - */ -private[spark] abstract class ExternalBlockManager { - - protected var blockManager: BlockManager = _ - - override def toString: String = {"External Block Store"} - - /** - * Initialize a concrete block manager implementation. Subclass should initialize its internal - * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore - * right after the class is constructed. The function should throw IOException on failure - * - * @throws java.io.IOException if there is any file system failure during the initialization. - */ - def init(blockManager: BlockManager, executorId: String): Unit = { - this.blockManager = blockManager - } - - /** - * Drop the block from underlying external block store, if it exists.. - * @return true on successfully removing the block - * false if the block could not be removed as it was not found - * - * @throws java.io.IOException if there is any file system failure in removing the block. - */ - def removeBlock(blockId: BlockId): Boolean - - /** - * Used by BlockManager to check the existence of the block in the underlying external - * block store. - * @return true if the block exists. - * false if the block does not exists. - * - * @throws java.io.IOException if there is any file system failure in checking - * the block existence. - */ - def blockExists(blockId: BlockId): Boolean - - /** - * Put the given block to the underlying external block store. Note that in normal case, - * putting a block should never fail unless something wrong happens to the underlying - * external block store, e.g., file system failure, etc. In this case, IOException - * should be thrown. - * - * @throws java.io.IOException if there is any file system failure in putting the block. - */ - def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit - - def putValues(blockId: BlockId, values: Iterator[_]): Unit = { - val bytes = blockManager.dataSerialize(blockId, values) - putBytes(blockId, bytes) - } - - /** - * Retrieve the block bytes. - * @return Some(ByteBuffer) if the block bytes is successfully retrieved - * None if the block does not exist in the external block store. - * - * @throws java.io.IOException if there is any file system failure in getting the block. - */ - def getBytes(blockId: BlockId): Option[ByteBuffer] - - /** - * Retrieve the block data. - * @return Some(Iterator[Any]) if the block data is successfully retrieved - * None if the block does not exist in the external block store. - * - * @throws java.io.IOException if there is any file system failure in getting the block. - */ - def getValues(blockId: BlockId): Option[Iterator[_]] = { - getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) - } - - /** - * Get the size of the block saved in the underlying external block store, - * which is saved before by putBytes. - * @return size of the block - * 0 if the block does not exist - * - * @throws java.io.IOException if there is any file system failure in getting the block size. - */ - def getSize(blockId: BlockId): Long - - /** - * Clean up any information persisted in the underlying external block store, - * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore - * during system shutdown. - * - */ - def shutdown() -} diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala deleted file mode 100644 index 94883a54a7..0000000000 --- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import java.nio.ByteBuffer - -import scala.util.control.NonFatal - -import org.apache.spark.Logging -import org.apache.spark.util.{ShutdownHookManager, Utils} - - -/** - * Stores BlockManager blocks on ExternalBlockStore. - * We capture any potential exception from underlying implementation - * and return with the expected failure value - */ -private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: String) - extends BlockStore(blockManager: BlockManager) with Logging { - - lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager() - - logInfo("ExternalBlockStore started") - - override def getSize(blockId: BlockId): Long = { - try { - externalBlockManager.map(_.getSize(blockId)).getOrElse(0) - } catch { - case NonFatal(t) => - logError(s"Error in getSize($blockId)", t) - 0L - } - } - - override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = { - putIntoExternalBlockStore(blockId, bytes, returnValues = true) - } - - override def putArray( - blockId: BlockId, - values: Array[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - putIntoExternalBlockStore(blockId, values.toIterator, returnValues) - } - - override def putIterator( - blockId: BlockId, - values: Iterator[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - putIntoExternalBlockStore(blockId, values, returnValues) - } - - private def putIntoExternalBlockStore( - blockId: BlockId, - values: Iterator[_], - returnValues: Boolean): PutResult = { - logTrace(s"Attempting to put block $blockId into ExternalBlockStore") - // we should never hit here if externalBlockManager is None. Handle it anyway for safety. - try { - val startTime = System.currentTimeMillis - if (externalBlockManager.isDefined) { - externalBlockManager.get.putValues(blockId, values) - val size = getSize(blockId) - val data = if (returnValues) { - Left(getValues(blockId).get) - } else { - null - } - val finishTime = System.currentTimeMillis - logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format( - blockId, Utils.bytesToString(size), finishTime - startTime)) - PutResult(size, data) - } else { - logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured") - PutResult(-1, null, Seq((blockId, BlockStatus.empty))) - } - } catch { - case NonFatal(t) => - logError(s"Error in putValues($blockId)", t) - PutResult(-1, null, Seq((blockId, BlockStatus.empty))) - } - } - - private def putIntoExternalBlockStore( - blockId: BlockId, - bytes: ByteBuffer, - returnValues: Boolean): PutResult = { - logTrace(s"Attempting to put block $blockId into ExternalBlockStore") - // we should never hit here if externalBlockManager is None. Handle it anyway for safety. - try { - val startTime = System.currentTimeMillis - if (externalBlockManager.isDefined) { - val byteBuffer = bytes.duplicate() - byteBuffer.rewind() - externalBlockManager.get.putBytes(blockId, byteBuffer) - val size = bytes.limit() - val data = if (returnValues) { - Right(bytes) - } else { - null - } - val finishTime = System.currentTimeMillis - logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format( - blockId, Utils.bytesToString(size), finishTime - startTime)) - PutResult(size, data) - } else { - logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured") - PutResult(-1, null, Seq((blockId, BlockStatus.empty))) - } - } catch { - case NonFatal(t) => - logError(s"Error in putBytes($blockId)", t) - PutResult(-1, null, Seq((blockId, BlockStatus.empty))) - } - } - - // We assume the block is removed even if exception thrown - override def remove(blockId: BlockId): Boolean = { - try { - externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true) - } catch { - case NonFatal(t) => - logError(s"Error in removeBlock($blockId)", t) - true - } - } - - override def getValues(blockId: BlockId): Option[Iterator[Any]] = { - try { - externalBlockManager.flatMap(_.getValues(blockId)) - } catch { - case NonFatal(t) => - logError(s"Error in getValues($blockId)", t) - None - } - } - - override def getBytes(blockId: BlockId): Option[ByteBuffer] = { - try { - externalBlockManager.flatMap(_.getBytes(blockId)) - } catch { - case NonFatal(t) => - logError(s"Error in getBytes($blockId)", t) - None - } - } - - override def contains(blockId: BlockId): Boolean = { - try { - val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false) - if (!ret) { - logInfo(s"Remove block $blockId") - blockManager.removeBlock(blockId, true) - } - ret - } catch { - case NonFatal(t) => - logError(s"Error in getBytes($blockId)", t) - false - } - } - - // Create concrete block manager and fall back to Tachyon by default for backward compatibility. - private def createBlkManager(): Option[ExternalBlockManager] = { - val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME) - .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME) - - try { - val instance = Utils.classForName(clsName) - .newInstance() - .asInstanceOf[ExternalBlockManager] - instance.init(blockManager, executorId) - ShutdownHookManager.addShutdownHook { () => - logDebug("Shutdown hook called") - externalBlockManager.map(_.shutdown()) - } - Some(instance) - } catch { - case NonFatal(t) => - logError("Cannot initialize external block store", t) - None - } - } -} - -private[spark] object ExternalBlockStore extends Logging { - val MAX_DIR_CREATION_ATTEMPTS = 10 - val SUB_DIRS_PER_DIR = "64" - val BASE_DIR = "spark.externalBlockStore.baseDir" - val FOLD_NAME = "spark.externalBlockStore.folderName" - val MASTER_URL = "spark.externalBlockStore.url" - val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager" - val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.TachyonBlockManager" -} diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 673f7ad79d..083d78b59e 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDD, RDDOperationScope} -import org.apache.spark.util.{CallSite, Utils} +import org.apache.spark.util.Utils @DeveloperApi class RDDInfo( @@ -37,15 +37,14 @@ class RDDInfo( var diskSize = 0L var externalBlockStoreSize = 0L - def isCached: Boolean = - (memSize + diskSize + externalBlockStoreSize > 0) && numCachedPartitions > 0 + def isCached: Boolean = (memSize + diskSize > 0) && numCachedPartitions > 0 override def toString: String = { import Utils.bytesToString ("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " + - "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s").format( + "MemorySize: %s; DiskSize: %s").format( name, id, storageLevel.toString, numCachedPartitions, numPartitions, - bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize)) + bytesToString(memSize), bytesToString(diskSize)) } override def compare(that: RDDInfo): Int = { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 703bce3e6b..38e9534251 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -150,7 +150,9 @@ object StorageLevel { val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) - val OFF_HEAP = new StorageLevel(false, false, true, false) + + // Redirect to MEMORY_ONLY_SER for now. + val OFF_HEAP = MEMORY_ONLY_SER /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index c4ac30092f..8e2cfb2441 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -48,14 +48,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { * non-RDD blocks for the same reason. In particular, RDD storage information is stored * in a map indexed by the RDD ID to the following 4-tuple: * - * (memory size, disk size, off-heap size, storage level) + * (memory size, disk size, storage level) * * We assume that all the blocks that belong to the same RDD have the same storage level. * This field is not relevant to non-RDD blocks, however, so the storage information for * non-RDD blocks contains only the first 3 fields (in the same order). */ - private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, Long, StorageLevel)] - private var _nonRddStorageInfo: (Long, Long, Long) = (0L, 0L, 0L) + private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)] + private var _nonRddStorageInfo: (Long, Long) = (0L, 0L) /** Create a storage status with an initial set of blocks, leaving the source unmodified. */ def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) { @@ -177,20 +177,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { /** Return the disk space used by this block manager. */ def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum - /** Return the off-heap space used by this block manager. */ - def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum - /** Return the memory used by the given RDD in this block manager in O(1) time. */ def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L) /** Return the disk space used by the given RDD in this block manager in O(1) time. */ def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L) - /** Return the off-heap space used by the given RDD in this block manager in O(1) time. */ - def offHeapUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L) - /** Return the storage level, if any, used by the given RDD in this block manager. */ - def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._4) + def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3) /** * Update the relevant storage info, taking into account any existing status for this block. @@ -199,34 +193,31 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty) val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize - val changeInExternalBlockStore = - newBlockStatus.externalBlockStoreSize - oldBlockStatus.externalBlockStoreSize val level = newBlockStatus.storageLevel // Compute new info from old info - val (oldMem, oldDisk, oldExternalBlockStore) = blockId match { + val (oldMem, oldDisk) = blockId match { case RDDBlockId(rddId, _) => _rddStorageInfo.get(rddId) - .map { case (mem, disk, externalBlockStore, _) => (mem, disk, externalBlockStore) } - .getOrElse((0L, 0L, 0L)) + .map { case (mem, disk, _) => (mem, disk) } + .getOrElse((0L, 0L)) case _ => _nonRddStorageInfo } val newMem = math.max(oldMem + changeInMem, 0L) val newDisk = math.max(oldDisk + changeInDisk, 0L) - val newExternalBlockStore = math.max(oldExternalBlockStore + changeInExternalBlockStore, 0L) // Set the correct info blockId match { case RDDBlockId(rddId, _) => // If this RDD is no longer persisted, remove it - if (newMem + newDisk + newExternalBlockStore == 0) { + if (newMem + newDisk == 0) { _rddStorageInfo.remove(rddId) } else { - _rddStorageInfo(rddId) = (newMem, newDisk, newExternalBlockStore, level) + _rddStorageInfo(rddId) = (newMem, newDisk, level) } case _ => - _nonRddStorageInfo = (newMem, newDisk, newExternalBlockStore) + _nonRddStorageInfo = (newMem, newDisk) } } @@ -248,13 +239,11 @@ private[spark] object StorageUtils { val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum val memSize = statuses.map(_.memUsedByRdd(rddId)).sum val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum - val externalBlockStoreSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum rddInfo.storageLevel = storageLevel rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize rddInfo.diskSize = diskSize - rddInfo.externalBlockStoreSize = externalBlockStoreSize } } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala deleted file mode 100644 index 6aa7e13901..0000000000 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import java.io.IOException -import java.nio.ByteBuffer -import java.text.SimpleDateFormat -import java.util.{Date, Random} - -import scala.util.control.NonFatal - -import com.google.common.io.ByteStreams -import tachyon.{Constants, TachyonURI} -import tachyon.client.ClientContext -import tachyon.client.file.{TachyonFile, TachyonFileSystem} -import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory -import tachyon.client.file.options.DeleteOptions -import tachyon.conf.TachyonConf -import tachyon.exception.{FileAlreadyExistsException, FileDoesNotExistException} - -import org.apache.spark.Logging -import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.util.Utils - -/** - * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By - * default, one block is mapped to one file with a name given by its BlockId. - * - */ -private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging { - - var rootDirs: String = _ - var master: String = _ - var client: TachyonFileSystem = _ - private var subDirsPerTachyonDir: Int = _ - - // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; - // then, inside this directory, create multiple subdirectories that we will hash files into, - // in order to avoid having really large inodes at the top level in Tachyon. - private var tachyonDirs: Array[TachyonFile] = _ - private var subDirs: Array[Array[TachyonFile]] = _ - private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]() - - override def init(blockManager: BlockManager, executorId: String): Unit = { - super.init(blockManager, executorId) - val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon") - val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME) - - rootDirs = s"$storeDir/$appFolderName/$executorId" - master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998") - client = if (master != null && master != "") { - val tachyonConf = new TachyonConf() - tachyonConf.set(Constants.MASTER_ADDRESS, master) - ClientContext.reset(tachyonConf) - TachyonFileSystemFactory.get - } else { - null - } - // original implementation call System.exit, we change it to run without extblkstore support - if (client == null) { - logError("Failed to connect to the Tachyon as the master address is not configured") - throw new IOException("Failed to connect to the Tachyon as the master " + - "address is not configured") - } - subDirsPerTachyonDir = blockManager.conf.get("spark.externalBlockStore.subDirectories", - ExternalBlockStore.SUB_DIRS_PER_DIR).toInt - - // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; - // then, inside this directory, create multiple subdirectories that we will hash files into, - // in order to avoid having really large inodes at the top level in Tachyon. - tachyonDirs = createTachyonDirs() - subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir)) - tachyonDirs.foreach(registerShutdownDeleteDir) - } - - override def toString: String = {"ExternalBlockStore-Tachyon"} - - override def removeBlock(blockId: BlockId): Boolean = { - val file = getFile(blockId) - if (fileExists(file)) { - removeFile(file) - true - } else { - false - } - } - - override def blockExists(blockId: BlockId): Boolean = { - val file = getFile(blockId) - fileExists(file) - } - - override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = { - val file = getFile(blockId) - val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath)) - try { - Utils.writeByteBuffer(bytes, os) - } catch { - case NonFatal(e) => - logWarning(s"Failed to put bytes of block $blockId into Tachyon", e) - os.cancel() - } finally { - os.close() - } - } - - override def putValues(blockId: BlockId, values: Iterator[_]): Unit = { - val file = getFile(blockId) - val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath)) - try { - blockManager.dataSerializeStream(blockId, os, values) - } catch { - case NonFatal(e) => - logWarning(s"Failed to put values of block $blockId into Tachyon", e) - os.cancel() - } finally { - os.close() - } - } - - override def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val file = getFile(blockId) - if (file == null) { - return None - } - val is = try { - client.getInStream(file) - } catch { - case _: FileDoesNotExistException => - return None - } - try { - val size = client.getInfo(file).length - val bs = new Array[Byte](size.asInstanceOf[Int]) - ByteStreams.readFully(is, bs) - Some(ByteBuffer.wrap(bs)) - } catch { - case NonFatal(e) => - logWarning(s"Failed to get bytes of block $blockId from Tachyon", e) - None - } finally { - is.close() - } - } - - override def getValues(blockId: BlockId): Option[Iterator[_]] = { - val file = getFile(blockId) - if (file == null) { - return None - } - val is = try { - client.getInStream(file) - } catch { - case _: FileDoesNotExistException => - return None - } - try { - Some(blockManager.dataDeserializeStream(blockId, is)) - } finally { - is.close() - } - } - - override def getSize(blockId: BlockId): Long = { - client.getInfo(getFile(blockId.name)).length - } - - def removeFile(file: TachyonFile): Unit = { - client.delete(file) - } - - def fileExists(file: TachyonFile): Boolean = { - try { - client.getInfo(file) - true - } catch { - case _: FileDoesNotExistException => false - } - } - - def getFile(filename: String): TachyonFile = { - // Figure out which tachyon directory it hashes to, and which subdirectory in that - val hash = Utils.nonNegativeHash(filename) - val dirId = hash % tachyonDirs.length - val subDirId = (hash / tachyonDirs.length) % subDirsPerTachyonDir - - // Create the subdirectory if it doesn't already exist - var subDir = subDirs(dirId)(subDirId) - if (subDir == null) { - subDir = subDirs(dirId).synchronized { - val old = subDirs(dirId)(subDirId) - if (old != null) { - old - } else { - val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}") - client.mkdir(path) - val newDir = client.loadMetadata(path) - subDirs(dirId)(subDirId) = newDir - newDir - } - } - } - val filePath = new TachyonURI(s"$subDir/$filename") - try { - client.create(filePath) - } catch { - case _: FileAlreadyExistsException => client.loadMetadata(filePath) - } - } - - def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name) - - // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore. - private def createTachyonDirs(): Array[TachyonFile] = { - logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'") - val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") - rootDirs.split(",").map { rootDir => - var foundLocalDir = false - var tachyonDir: TachyonFile = null - var tachyonDirId: String = null - var tries = 0 - val rand = new Random() - while (!foundLocalDir && tries < ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS) { - tries += 1 - try { - tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId") - try { - foundLocalDir = client.mkdir(path) - tachyonDir = client.loadMetadata(path) - } catch { - case _: FileAlreadyExistsException => // continue - } - } catch { - case NonFatal(e) => - logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e) - } - } - if (!foundLocalDir) { - logError("Failed " + ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS - + " attempts to create tachyon dir in " + rootDir) - System.exit(ExecutorExitCode.EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR) - } - logInfo("Created tachyon directory at " + tachyonDir) - tachyonDir - } - } - - override def shutdown() { - logDebug("Shutdown hook called") - tachyonDirs.foreach { tachyonDir => - try { - if (!hasRootAsShutdownDeleteDir(tachyonDir)) { - deleteRecursively(tachyonDir, client) - } - } catch { - case NonFatal(e) => - logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) - } - } - } - - /** - * Delete a file or directory and its contents recursively. - */ - private def deleteRecursively(dir: TachyonFile, client: TachyonFileSystem) { - client.delete(dir, new DeleteOptions.Builder(ClientContext.getConf).setRecursive(true).build()) - } - - // Register the tachyon path to be deleted via shutdown hook - private def registerShutdownDeleteDir(file: TachyonFile) { - val absolutePath = client.getInfo(file).getPath - shutdownDeleteTachyonPaths.synchronized { - shutdownDeleteTachyonPaths += absolutePath - } - } - - // Remove the tachyon path to be deleted via shutdown hook - private def removeShutdownDeleteDir(file: TachyonFile) { - val absolutePath = client.getInfo(file).getPath - shutdownDeleteTachyonPaths.synchronized { - shutdownDeleteTachyonPaths -= absolutePath - } - } - - // Is the path already registered to be deleted via a shutdown hook ? - private def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = { - val absolutePath = client.getInfo(file).getPath - shutdownDeleteTachyonPaths.synchronized { - shutdownDeleteTachyonPaths.contains(absolutePath) - } - } - - // Note: if file is child of some registered path, while not equal to it, then return true; - // else false. This is to ensure that two shutdown hooks do not try to delete each others - // paths - resulting in Exception and incomplete cleanup. - private def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = { - val absolutePath = client.getInfo(file).getPath - val hasRoot = shutdownDeleteTachyonPaths.synchronized { - shutdownDeleteTachyonPaths.exists( - path => !absolutePath.equals(path) && absolutePath.startsWith(path)) - } - if (hasRoot) { - logInfo(s"path = $absolutePath, already present as root for deletion.") - } - hasRoot - } - -} diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 04f584621e..c9bb49b83e 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -54,7 +54,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { "Cached Partitions", "Fraction Cached", "Size in Memory", - "Size in ExternalBlockStore", "Size on Disk") /** Render an HTML row representing an RDD */ @@ -71,7 +70,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { <td>{rdd.numCachedPartitions.toString}</td> <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td> <td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td> - <td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td> <td sorttable_customkey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td> </tr> // scalastyle:on @@ -104,7 +102,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { "Executor ID", "Address", "Total Size in Memory", - "Total Size in ExternalBlockStore", "Total Size on Disk", "Stream Blocks") @@ -119,9 +116,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { <td sorttable_customkey={status.totalMemSize.toString}> {Utils.bytesToString(status.totalMemSize)} </td> - <td sorttable_customkey={status.totalExternalBlockStoreSize.toString}> - {Utils.bytesToString(status.totalExternalBlockStoreSize)} - </td> <td sorttable_customkey={status.totalDiskSize.toString}> {Utils.bytesToString(status.totalDiskSize)} </td> @@ -195,8 +189,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { ("Memory", block.memSize) } else if (block.storageLevel.useMemory && !block.storageLevel.deserialized) { ("Memory Serialized", block.memSize) - } else if (block.storageLevel.useOffHeap) { - ("External", block.externalBlockStoreSize) } else { throw new IllegalStateException(s"Invalid Storage Level: ${block.storageLevel}") } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a62fd2f339..a6460bc8b8 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -409,14 +409,12 @@ private[spark] object JsonProtocol { ("Number of Partitions" -> rddInfo.numPartitions) ~ ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ ("Memory Size" -> rddInfo.memSize) ~ - ("ExternalBlockStore Size" -> rddInfo.externalBlockStoreSize) ~ ("Disk Size" -> rddInfo.diskSize) } def storageLevelToJson(storageLevel: StorageLevel): JValue = { ("Use Disk" -> storageLevel.useDisk) ~ ("Use Memory" -> storageLevel.useMemory) ~ - ("Use ExternalBlockStore" -> storageLevel.useOffHeap) ~ ("Deserialized" -> storageLevel.deserialized) ~ ("Replication" -> storageLevel.replication) } @@ -425,7 +423,6 @@ private[spark] object JsonProtocol { val storageLevel = storageLevelToJson(blockStatus.storageLevel) ("Storage Level" -> storageLevel) ~ ("Memory Size" -> blockStatus.memSize) ~ - ("ExternalBlockStore Size" -> blockStatus.externalBlockStoreSize) ~ ("Disk Size" -> blockStatus.diskSize) } @@ -867,15 +864,11 @@ private[spark] object JsonProtocol { val numPartitions = (json \ "Number of Partitions").extract[Int] val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] val memSize = (json \ "Memory Size").extract[Long] - // fallback to tachyon for backward compatibility - val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome - .getOrElse(json \ "Tachyon Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize - rddInfo.externalBlockStoreSize = externalBlockStoreSize rddInfo.diskSize = diskSize rddInfo } @@ -883,22 +876,16 @@ private[spark] object JsonProtocol { def storageLevelFromJson(json: JValue): StorageLevel = { val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] - // fallback to tachyon for backward compatability - val useExternalBlockStore = (json \ "Use ExternalBlockStore").toSome - .getOrElse(json \ "Use Tachyon").extract[Boolean] val deserialized = (json \ "Deserialized").extract[Boolean] val replication = (json \ "Replication").extract[Int] - StorageLevel(useDisk, useMemory, useExternalBlockStore, deserialized, replication) + StorageLevel(useDisk, useMemory, deserialized, replication) } def blockStatusFromJson(json: JValue): BlockStatus = { val storageLevel = storageLevelFromJson(json \ "Storage Level") val memorySize = (json \ "Memory Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - // fallback to tachyon for backward compatability - val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome - .getOrElse(json \ "Tachyon Size").extract[Long] - BlockStatus(storageLevel, memorySize, diskSize, externalBlockStoreSize) + BlockStatus(storageLevel, memorySize, diskSize) } def executorInfoFromJson(json: JValue): ExecutorInfo = { |