diff options
Diffstat (limited to 'core')
21 files changed, 521 insertions, 324 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bae951f388..fe24260cdb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -247,9 +247,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] def eventLogDir: Option[URI] = _eventLogDir private[spark] def eventLogCodec: Option[String] = _eventLogCodec - // Generate the random name for a temp folder in Tachyon + // Generate the random name for a temp folder in external block store. // Add a timestamp as the suffix here to make it more safe - val tachyonFolderName = "spark-" + randomUUID.toString() + val externalBlockStoreFolderName = "spark-" + randomUUID.toString() + @deprecated("Use externalBlockStoreFolderName instead.", "1.4.0") + val tachyonFolderName = externalBlockStoreFolderName def isLocal: Boolean = (master == "local" || master.startsWith("local[")) @@ -386,7 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - _conf.set("spark.tachyonStore.folderName", tachyonFolderName) + _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName) if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index 52862ae0ca..ea36fb60bd 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -33,11 +33,11 @@ object ExecutorExitCode { /** DiskStore failed to create a local temporary directory after many attempts. */ val DISK_STORE_FAILED_TO_CREATE_DIR = 53 - /** TachyonStore failed to initialize after many attempts. */ - val TACHYON_STORE_FAILED_TO_INITIALIZE = 54 + /** ExternalBlockStore failed to initialize after many attempts. */ + val EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE = 54 - /** TachyonStore failed to create a local temporary directory after many attempts. */ - val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55 + /** ExternalBlockStore failed to create a local temporary directory after many attempts. */ + val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55 def explainExitCode(exitCode: Int): String = { exitCode match { @@ -46,9 +46,11 @@ object ExecutorExitCode { case OOM => "OutOfMemoryError" case DISK_STORE_FAILED_TO_CREATE_DIR => "Failed to create local directory (bad spark.local.dir?)" - case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize." - case TACHYON_STORE_FAILED_TO_CREATE_DIR => - "TachyonStore failed to create a local temporary directory." + // TODO: replace external block store with concrete implementation name + case EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE => "ExternalBlockStore failed to initialize." + // TODO: replace external block store with concrete implementation name + case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR => + "ExternalBlockStore failed to create a local temporary directory." case _ => "Unknown executor exit code (" + exitCode + ")" + ( if (exitCode > 128) { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 330255f892..31c07c73fe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag]( val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else "" val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info => - " CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format( + " CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format( info.numCachedPartitions, bytesToString(info.memSize), - bytesToString(info.tachyonSize), bytesToString(info.diskSize))) + bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize))) s"$rdd [$persistence]" +: storageInfo } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 55718e584c..402ee1c764 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -78,19 +78,11 @@ private[spark] class BlockManager( private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] // Actual storage of where blocks are kept - private var tachyonInitialized = false + private var externalBlockStoreInitialized = false private[spark] val memoryStore = new MemoryStore(this, maxMemory) private[spark] val diskStore = new DiskStore(this, diskBlockManager) - private[spark] lazy val tachyonStore: TachyonStore = { - val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") - val appFolderName = conf.get("spark.tachyonStore.folderName") - val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}" - val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998") - val tachyonBlockManager = - new TachyonBlockManager(this, tachyonStorePath, tachyonMaster) - tachyonInitialized = true - new TachyonStore(this, tachyonBlockManager) - } + private[spark] lazy val externalBlockStore: ExternalBlockStore = + new ExternalBlockStore(this, executorId) private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) @@ -320,13 +312,13 @@ private[spark] class BlockManager( /** * Get the BlockStatus for the block identified by the given ID, if it exists. - * NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon. + * NOTE: This is mainly for testing, and it doesn't fetch information from external block store. */ def getStatus(blockId: BlockId): Option[BlockStatus] = { blockInfo.get(blockId).map { info => val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L - // Assume that block is not in Tachyon + // Assume that block is not in external block store BlockStatus(info.level, memSize, diskSize, 0L) } } @@ -376,10 +368,10 @@ private[spark] class BlockManager( if (info.tellMaster) { val storageLevel = status.storageLevel val inMemSize = Math.max(status.memSize, droppedMemorySize) - val inTachyonSize = status.tachyonSize + val inExternalBlockStoreSize = status.externalBlockStoreSize val onDiskSize = status.diskSize master.updateBlockInfo( - blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize) + blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize) } else { true } @@ -397,15 +389,17 @@ private[spark] class BlockManager( BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) - val inTachyon = level.useOffHeap && tachyonStore.contains(blockId) + val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false - val replication = if (inMem || inTachyon || onDisk) level.replication else 1 - val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication) + val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1 + val storageLevel = + StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L - val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L + val externalBlockStoreSize = + if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L - BlockStatus(storageLevel, memSize, diskSize, tachyonSize) + BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize) } } } @@ -485,11 +479,11 @@ private[spark] class BlockManager( } } - // Look for the block in Tachyon + // Look for the block in external block store if (level.useOffHeap) { - logDebug(s"Getting block $blockId from tachyon") - if (tachyonStore.contains(blockId)) { - tachyonStore.getBytes(blockId) match { + logDebug(s"Getting block $blockId from ExternalBlockStore") + if (externalBlockStore.contains(blockId)) { + externalBlockStore.getBytes(blockId) match { case Some(bytes) => if (!asBlockResult) { return Some(bytes) @@ -498,7 +492,7 @@ private[spark] class BlockManager( dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size)) } case None => - logDebug(s"Block $blockId not found in tachyon") + logDebug(s"Block $blockId not found in externalBlockStore") } } } @@ -766,8 +760,8 @@ private[spark] class BlockManager( // We will drop it to disk later if the memory store can't hold it. (true, memoryStore) } else if (putLevel.useOffHeap) { - // Use tachyon for off-heap storage - (false, tachyonStore) + // Use external block store + (false, externalBlockStore) } else if (putLevel.useDisk) { // Don't get back the bytes from put unless we replicate them (putLevel.replication > 1, diskStore) @@ -802,7 +796,7 @@ private[spark] class BlockManager( val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { - // Now that the block is in either the memory, tachyon, or disk store, + // Now that the block is in either the memory, externalBlockStore, or disk store, // let other threads read it, and tell the master about it. marked = true putBlockInfo.markReady(size) @@ -1099,10 +1093,11 @@ private[spark] class BlockManager( // Removals are idempotent in disk store and memory store. At worst, we get a warning. val removedFromMemory = memoryStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId) - val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false - if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { + val removedFromExternalBlockStore = + if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) { logWarning(s"Block $blockId could not be removed as it was not found in either " + - "the disk, memory, or tachyon store") + "the disk, memory, or external block store") } blockInfo.remove(blockId) if (tellMaster && info.tellMaster) { @@ -1136,7 +1131,7 @@ private[spark] class BlockManager( val level = info.level if (level.useMemory) { memoryStore.remove(id) } if (level.useDisk) { diskStore.remove(id) } - if (level.useOffHeap) { tachyonStore.remove(id) } + if (level.useOffHeap) { externalBlockStore.remove(id) } iterator.remove() logInfo(s"Dropped block $id") } @@ -1216,8 +1211,8 @@ private[spark] class BlockManager( blockInfo.clear() memoryStore.clear() diskStore.clear() - if (tachyonInitialized) { - tachyonStore.clear() + if (externalBlockStoreInitialized) { + externalBlockStore.clear() } metadataCleaner.cancel() broadcastCleaner.cancel() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 9bfc4201d3..a85e1c7632 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -54,9 +54,10 @@ class BlockManagerMaster( storageLevel: StorageLevel, memSize: Long, diskSize: Long, - tachyonSize: Long): Boolean = { + externalBlockStoreSize: Long): Boolean = { val res = driverEndpoint.askWithRetry[Boolean]( - UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)) + UpdateBlockInfo(blockManagerId, blockId, storageLevel, + memSize, diskSize, externalBlockStoreSize)) logDebug(s"Updated info of block $blockId") res } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 7212362df5..3afb4c3c02 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint( context.reply(true) case UpdateBlockInfo( - blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) => + blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) => context.reply(updateBlockInfo( - blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize)) + blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize)) case GetLocations(blockId) => context.reply(getLocations(blockId)) @@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint( storageLevel: StorageLevel, memSize: Long, diskSize: Long, - tachyonSize: Long): Boolean = { + externalBlockStoreSize: Long): Boolean = { if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { @@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint( } blockManagerInfo(blockManagerId).updateBlockInfo( - blockId, storageLevel, memSize, diskSize, tachyonSize) + blockId, storageLevel, memSize, diskSize, externalBlockStoreSize) var locations: mutable.HashSet[BlockManagerId] = null if (blockLocations.containsKey(blockId)) { @@ -396,8 +396,8 @@ case class BlockStatus( storageLevel: StorageLevel, memSize: Long, diskSize: Long, - tachyonSize: Long) { - def isCached: Boolean = memSize + diskSize + tachyonSize > 0 + externalBlockStoreSize: Long) { + def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0 } @DeveloperApi @@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo( storageLevel: StorageLevel, memSize: Long, diskSize: Long, - tachyonSize: Long) { + externalBlockStoreSize: Long) { updateLastSeenMs() @@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo( } if (storageLevel.isValid) { - /* isValid means it is either stored in-memory, on-disk or on-Tachyon. + /* isValid means it is either stored in-memory, on-disk or on-externalBlockStore. * The memSize here indicates the data size in or dropped from memory, - * tachyonSize here indicates the data size in or dropped from Tachyon, + * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore, * and the diskSize here indicates the data size in or dropped to disk. * They can be both larger than 0, when a block is dropped from memory to disk. * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */ @@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo( blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) } if (storageLevel.useOffHeap) { - _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize)) - logInfo("Added %s on tachyon on %s (size: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize))) + _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)) + logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize))) } } else if (_blocks.containsKey(blockId)) { // If isValid is not true, drop the block. @@ -482,8 +482,9 @@ private[spark] class BlockManagerInfo( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize))) } if (blockStatus.storageLevel.useOffHeap) { - logInfo("Removed %s on %s on tachyon (size: %s)".format( - blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize))) + logInfo("Removed %s on %s on externalBlockStore (size: %s)".format( + blockId, blockManagerId.hostPort, + Utils.bytesToString(blockStatus.externalBlockStoreSize))) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index f89d8d7493..1683576067 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages { var storageLevel: StorageLevel, var memSize: Long, var diskSize: Long, - var tachyonSize: Long) + var externalBlockStoreSize: Long) extends ToBlockManagerMaster with Externalizable { @@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages { storageLevel.writeExternal(out) out.writeLong(memSize) out.writeLong(diskSize) - out.writeLong(tachyonSize) + out.writeLong(externalBlockStoreSize) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { @@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages { storageLevel = StorageLevel(in) memSize = in.readLong() diskSize = in.readLong() - tachyonSize = in.readLong() + externalBlockStoreSize = in.readLong() } } diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala new file mode 100644 index 0000000000..8964762df6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.nio.ByteBuffer + +/** + * An abstract class that the concrete external block manager has to inherit. + * The class has to have a no-argument constructor, and will be initialized by init, + * which is invoked by ExternalBlockStore. The main input parameter is blockId for all + * the methods, which is the unique identifier for Block in one Spark application. + * + * The underlying external block manager should avoid any name space conflicts among multiple + * Spark applications. For example, creating different directory for different applications + * by randomUUID + * + */ +private[spark] abstract class ExternalBlockManager { + + override def toString: String = {"External Block Store"} + + /** + * Initialize a concrete block manager implementation. Subclass should initialize its internal + * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore + * right after the class is constructed. The function should throw IOException on failure + * + * @throws java.io.IOException if there is any file system failure during the initialization. + */ + def init(blockManager: BlockManager, executorId: String): Unit + + /** + * Drop the block from underlying external block store, if it exists.. + * @return true on successfully removing the block + * false if the block could not be removed as it was not found + * + * @throws java.io.IOException if there is any file system failure in removing the block. + */ + def removeBlock(blockId: BlockId): Boolean + + /** + * Used by BlockManager to check the existence of the block in the underlying external + * block store. + * @return true if the block exists. + * false if the block does not exists. + * + * @throws java.io.IOException if there is any file system failure in checking + * the block existence. + */ + def blockExists(blockId: BlockId): Boolean + + /** + * Put the given block to the underlying external block store. Note that in normal case, + * putting a block should never fail unless something wrong happens to the underlying + * external block store, e.g., file system failure, etc. In this case, IOException + * should be thrown. + * + * @throws java.io.IOException if there is any file system failure in putting the block. + */ + def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit + + /** + * Retrieve the block bytes. + * @return Some(ByteBuffer) if the block bytes is successfully retrieved + * None if the block does not exist in the external block store. + * + * @throws java.io.IOException if there is any file system failure in getting the block. + */ + def getBytes(blockId: BlockId): Option[ByteBuffer] + + /** + * Get the size of the block saved in the underlying external block store, + * which is saved before by putBytes. + * @return size of the block + * 0 if the block does not exist + * + * @throws java.io.IOException if there is any file system failure in getting the block size. + */ + def getSize(blockId: BlockId): Long + + /** + * Clean up any information persisted in the underlying external block store, + * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore + * during system shutdown. + * + */ + def shutdown() +} diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala new file mode 100644 index 0000000000..0bf770306a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.nio.ByteBuffer +import org.apache.spark.Logging +import org.apache.spark.util.Utils +import scala.util.control.NonFatal + + +/** + * Stores BlockManager blocks on ExternalBlockStore. + * We capture any potential exception from underlying implementation + * and return with the expected failure value + */ +private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: String) + extends BlockStore(blockManager: BlockManager) with Logging { + + lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager() + + logInfo("ExternalBlockStore started") + + override def getSize(blockId: BlockId): Long = { + try { + externalBlockManager.map(_.getSize(blockId)).getOrElse(0) + } catch { + case NonFatal(t) => + logError(s"error in getSize from $blockId", t) + 0L + } + } + + override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = { + putIntoExternalBlockStore(blockId, bytes, returnValues = true) + } + + override def putArray( + blockId: BlockId, + values: Array[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + putIterator(blockId, values.toIterator, level, returnValues) + } + + override def putIterator( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + logDebug(s"Attempting to write values for block $blockId") + val bytes = blockManager.dataSerialize(blockId, values) + putIntoExternalBlockStore(blockId, bytes, returnValues) + } + + private def putIntoExternalBlockStore( + blockId: BlockId, + bytes: ByteBuffer, + returnValues: Boolean): PutResult = { + // So that we do not modify the input offsets ! + // duplicate does not copy buffer, so inexpensive + val byteBuffer = bytes.duplicate() + byteBuffer.rewind() + logDebug(s"Attempting to put block $blockId into ExtBlk store") + // we should never hit here if externalBlockManager is None. Handle it anyway for safety. + try { + val startTime = System.currentTimeMillis + if (externalBlockManager.isDefined) { + externalBlockManager.get.putBytes(blockId, bytes) + val finishTime = System.currentTimeMillis + logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format( + blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime)) + + if (returnValues) { + PutResult(bytes.limit(), Right(bytes.duplicate())) + } else { + PutResult(bytes.limit(), null) + } + } else { + logError(s"error in putBytes $blockId") + PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty))) + } + } catch { + case NonFatal(t) => + logError(s"error in putBytes $blockId", t) + PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty))) + } + } + + // We assume the block is removed even if exception thrown + override def remove(blockId: BlockId): Boolean = { + try { + externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true) + } catch { + case NonFatal(t) => + logError(s"error in removing $blockId", t) + true + } + } + + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { + getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) + } + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + try { + externalBlockManager.flatMap(_.getBytes(blockId)) + } catch { + case NonFatal(t) => + logError(s"error in getBytes from $blockId", t) + None + } + } + + override def contains(blockId: BlockId): Boolean = { + try { + val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false) + if (!ret) { + logInfo(s"remove block $blockId") + blockManager.removeBlock(blockId, true) + } + ret + } catch { + case NonFatal(t) => + logError(s"error in getBytes from $blockId", t) + false + } + } + + private def addShutdownHook() { + Runtime.getRuntime.addShutdownHook(new Thread("ExternalBlockStore shutdown hook") { + override def run(): Unit = Utils.logUncaughtExceptions { + logDebug("Shutdown hook called") + externalBlockManager.map(_.shutdown()) + } + }) + } + + // Create concrete block manager and fall back to Tachyon by default for backward compatibility. + private def createBlkManager(): Option[ExternalBlockManager] = { + val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME) + .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME) + + try { + val instance = Class.forName(clsName) + .newInstance() + .asInstanceOf[ExternalBlockManager] + instance.init(blockManager, executorId) + addShutdownHook(); + Some(instance) + } catch { + case NonFatal(t) => + logError("Cannot initialize external block store", t) + None + } + } +} + +private[spark] object ExternalBlockStore extends Logging { + val MAX_DIR_CREATION_ATTEMPTS = 10 + val SUB_DIRS_PER_DIR = "64" + val BASE_DIR = "spark.externalBlockStore.baseDir" + val FOLD_NAME = "spark.externalBlockStore.folderName" + val MASTER_URL = "spark.externalBlockStore.url" + val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager" + val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.TachyonBlockManager" +} diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 034525b56f..ad53a3edc7 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -32,16 +32,17 @@ class RDDInfo( var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L - var tachyonSize = 0L + var externalBlockStoreSize = 0L - def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0 + def isCached: Boolean = + (memSize + diskSize + externalBlockStoreSize > 0) && numCachedPartitions > 0 override def toString: String = { import Utils.bytesToString ("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " + - "MemorySize: %s; TachyonSize: %s; DiskSize: %s").format( + "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s").format( name, id, storageLevel.toString, numCachedPartitions, numPartitions, - bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize)) + bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize)) } override def compare(that: RDDInfo): Int = { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 134abea866..703bce3e6b 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -26,9 +26,9 @@ import org.apache.spark.util.Utils /** * :: DeveloperApi :: * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, - * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to - * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on - * multiple nodes. + * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or + * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether + * to replicate the RDD partitions on multiple nodes. * * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants * for commonly useful storage levels. To create your own storage level object, use the @@ -126,7 +126,7 @@ class StorageLevel private( var result = "" result += (if (useDisk) "Disk " else "") result += (if (useMemory) "Memory " else "") - result += (if (useOffHeap) "Tachyon " else "") + result += (if (useOffHeap) "ExternalBlockStore " else "") result += (if (deserialized) "Deserialized " else "Serialized ") result += s"${replication}x Replicated" result diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 2bd6b749be..c4ac30092f 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -199,33 +199,34 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty) val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize - val changeInTachyon = newBlockStatus.tachyonSize - oldBlockStatus.tachyonSize + val changeInExternalBlockStore = + newBlockStatus.externalBlockStoreSize - oldBlockStatus.externalBlockStoreSize val level = newBlockStatus.storageLevel // Compute new info from old info - val (oldMem, oldDisk, oldTachyon) = blockId match { + val (oldMem, oldDisk, oldExternalBlockStore) = blockId match { case RDDBlockId(rddId, _) => _rddStorageInfo.get(rddId) - .map { case (mem, disk, tachyon, _) => (mem, disk, tachyon) } + .map { case (mem, disk, externalBlockStore, _) => (mem, disk, externalBlockStore) } .getOrElse((0L, 0L, 0L)) case _ => _nonRddStorageInfo } val newMem = math.max(oldMem + changeInMem, 0L) val newDisk = math.max(oldDisk + changeInDisk, 0L) - val newTachyon = math.max(oldTachyon + changeInTachyon, 0L) + val newExternalBlockStore = math.max(oldExternalBlockStore + changeInExternalBlockStore, 0L) // Set the correct info blockId match { case RDDBlockId(rddId, _) => // If this RDD is no longer persisted, remove it - if (newMem + newDisk + newTachyon == 0) { + if (newMem + newDisk + newExternalBlockStore == 0) { _rddStorageInfo.remove(rddId) } else { - _rddStorageInfo(rddId) = (newMem, newDisk, newTachyon, level) + _rddStorageInfo(rddId) = (newMem, newDisk, newExternalBlockStore, level) } case _ => - _nonRddStorageInfo = (newMem, newDisk, newTachyon) + _nonRddStorageInfo = (newMem, newDisk, newExternalBlockStore) } } @@ -247,13 +248,13 @@ private[spark] object StorageUtils { val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum val memSize = statuses.map(_.memUsedByRdd(rddId)).sum val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum - val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum + val externalBlockStoreSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum rddInfo.storageLevel = storageLevel rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize rddInfo.diskSize = diskSize - rddInfo.tachyonSize = tachyonSize + rddInfo.externalBlockStoreSize = externalBlockStoreSize } } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index 583f1fdf04..bdc6276e41 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -17,13 +17,16 @@ package org.apache.spark.storage +import java.io.IOException +import java.nio.ByteBuffer import java.text.SimpleDateFormat import java.util.{Date, Random} +import com.google.common.io.ByteStreams +import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile} import tachyon.TachyonURI -import tachyon.client.{TachyonFile, TachyonFS} -import org.apache.spark.Logging +import org.apache.spark.{SparkException, SparkConf, Logging} import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.util.Utils @@ -32,32 +35,94 @@ import org.apache.spark.util.Utils * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By * default, one block is mapped to one file with a name given by its BlockId. * - * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ -private[spark] class TachyonBlockManager( - blockManager: BlockManager, - rootDirs: String, - val master: String) - extends Logging { +private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging { - val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null - - if (client == null) { - logError("Failed to connect to the Tachyon as the master address is not configured") - System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_INITIALIZE) - } - - private val MAX_DIR_CREATION_ATTEMPTS = 10 - private val subDirsPerTachyonDir = - blockManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt + var blockManager: BlockManager =_ + var rootDirs: String = _ + var master: String = _ + var client: tachyon.client.TachyonFS = _ + private var subDirsPerTachyonDir: Int = _ // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; // then, inside this directory, create multiple subdirectories that we will hash files into, // in order to avoid having really large inodes at the top level in Tachyon. - private val tachyonDirs: Array[TachyonFile] = createTachyonDirs() - private val subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir)) + private var tachyonDirs: Array[TachyonFile] = _ + private var subDirs: Array[Array[tachyon.client.TachyonFile]] = _ + + + override def init(blockManager: BlockManager, executorId: String): Unit = { + this.blockManager = blockManager + val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon") + val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME) + + rootDirs = s"$storeDir/$appFolderName/$executorId" + master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998") + client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null + // original implementation call System.exit, we change it to run without extblkstore support + if (client == null) { + logError("Failed to connect to the Tachyon as the master address is not configured") + throw new IOException("Failed to connect to the Tachyon as the master " + + "address is not configured") + } + subDirsPerTachyonDir = blockManager.conf.get("spark.externalBlockStore.subDirectories", + ExternalBlockStore.SUB_DIRS_PER_DIR).toInt + + // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; + // then, inside this directory, create multiple subdirectories that we will hash files into, + // in order to avoid having really large inodes at the top level in Tachyon. + tachyonDirs = createTachyonDirs() + subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir)) + tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir)) + } + + override def toString: String = {"ExternalBlockStore-Tachyon"} + + override def removeBlock(blockId: BlockId): Boolean = { + val file = getFile(blockId) + if (fileExists(file)) { + removeFile(file) + } else { + false + } + } + + override def blockExists(blockId: BlockId): Boolean = { + val file = getFile(blockId) + fileExists(file) + } - addShutdownHook() + override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = { + val file = getFile(blockId) + val os = file.getOutStream(WriteType.TRY_CACHE) + os.write(bytes.array()) + os.close() + } + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val file = getFile(blockId) + if (file == null || file.getLocationHosts.size == 0) { + return None + } + val is = file.getInStream(ReadType.CACHE) + assert (is != null) + try { + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) + } catch { + case ioe: IOException => + logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe) + None + } finally { + is.close() + } + } + + override def getSize(blockId: BlockId): Long = { + getFile(blockId.name).length + } def removeFile(file: TachyonFile): Boolean = { client.delete(new TachyonURI(file.getPath()), false) @@ -109,7 +174,7 @@ private[spark] class TachyonBlockManager( var tachyonDirId: String = null var tries = 0 val rand = new Random() - while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { + while (!foundLocalDir && tries < ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS) { tries += 1 try { tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) @@ -124,30 +189,27 @@ private[spark] class TachyonBlockManager( } } if (!foundLocalDir) { - logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " + - rootDir) - System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR) + logError("Failed " + ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS + + " attempts to create tachyon dir in " + rootDir) + System.exit(ExecutorExitCode.EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR) } logInfo("Created tachyon directory at " + tachyonDir) tachyonDir } } - private def addShutdownHook() { - tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir)) - Utils.addShutdownHook { () => - logDebug("Shutdown hook called") - tachyonDirs.foreach { tachyonDir => - try { - if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) { - Utils.deleteRecursively(tachyonDir, client) - } - } catch { - case e: Exception => - logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) + override def shutdown() { + logDebug("Shutdown hook called") + tachyonDirs.foreach { tachyonDir => + try { + if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) { + Utils.deleteRecursively(tachyonDir, client) } + } catch { + case e: Exception => + logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) } - client.close() } + client.close() } } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala deleted file mode 100644 index 65fa81704c..0000000000 --- a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import tachyon.client.TachyonFile - -/** - * References a particular segment of a file (potentially the entire file), based off an offset and - * a length. - */ -private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) { - override def toString: String = { - "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length) - } -} diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala deleted file mode 100644 index 233d1e2b7c..0000000000 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import java.io.IOException -import java.nio.ByteBuffer - -import com.google.common.io.ByteStreams -import tachyon.client.{ReadType, WriteType} - -import org.apache.spark.Logging -import org.apache.spark.util.Utils - -/** - * Stores BlockManager blocks on Tachyon. - */ -private[spark] class TachyonStore( - blockManager: BlockManager, - tachyonManager: TachyonBlockManager) - extends BlockStore(blockManager: BlockManager) with Logging { - - logInfo("TachyonStore started") - - override def getSize(blockId: BlockId): Long = { - tachyonManager.getFile(blockId.name).length - } - - override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = { - putIntoTachyonStore(blockId, bytes, returnValues = true) - } - - override def putArray( - blockId: BlockId, - values: Array[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - putIterator(blockId, values.toIterator, level, returnValues) - } - - override def putIterator( - blockId: BlockId, - values: Iterator[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - logDebug(s"Attempting to write values for block $blockId") - val bytes = blockManager.dataSerialize(blockId, values) - putIntoTachyonStore(blockId, bytes, returnValues) - } - - private def putIntoTachyonStore( - blockId: BlockId, - bytes: ByteBuffer, - returnValues: Boolean): PutResult = { - // So that we do not modify the input offsets ! - // duplicate does not copy buffer, so inexpensive - val byteBuffer = bytes.duplicate() - byteBuffer.rewind() - logDebug(s"Attempting to put block $blockId into Tachyon") - val startTime = System.currentTimeMillis - val file = tachyonManager.getFile(blockId) - val os = file.getOutStream(WriteType.TRY_CACHE) - os.write(byteBuffer.array()) - os.close() - val finishTime = System.currentTimeMillis - logDebug("Block %s stored as %s file in Tachyon in %d ms".format( - blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime)) - - if (returnValues) { - PutResult(bytes.limit(), Right(bytes.duplicate())) - } else { - PutResult(bytes.limit(), null) - } - } - - override def remove(blockId: BlockId): Boolean = { - val file = tachyonManager.getFile(blockId) - if (tachyonManager.fileExists(file)) { - tachyonManager.removeFile(file) - } else { - false - } - } - - override def getValues(blockId: BlockId): Option[Iterator[Any]] = { - getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) - } - - override def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val file = tachyonManager.getFile(blockId) - if (file == null || file.getLocationHosts.size == 0) { - return None - } - val is = file.getInStream(ReadType.CACHE) - assert (is != null) - try { - val size = file.length - val bs = new Array[Byte](size.asInstanceOf[Int]) - ByteStreams.readFully(is, bs) - Some(ByteBuffer.wrap(bs)) - } catch { - case ioe: IOException => - logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe) - None - } finally { - is.close() - } - } - - override def contains(blockId: BlockId): Boolean = { - val file = tachyonManager.getFile(blockId) - tachyonManager.fileExists(file) - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 6ced6052d2..59dc6b547c 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -42,7 +42,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { "Cached Partitions", "Fraction Cached", "Size in Memory", - "Size in Tachyon", + "Size in ExternalBlockStore", "Size on Disk") /** Render an HTML row representing an RDD */ @@ -59,7 +59,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { <td>{rdd.numCachedPartitions}</td> <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td> <td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td> - <td sorttable_customkey={rdd.tachyonSize.toString}>{Utils.bytesToString(rdd.tachyonSize)}</td> + <td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td> <td sorttable_customkey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td> </tr> // scalastyle:on diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 474f79fb75..44d274956d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -373,14 +373,14 @@ private[spark] object JsonProtocol { ("Number of Partitions" -> rddInfo.numPartitions) ~ ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ ("Memory Size" -> rddInfo.memSize) ~ - ("Tachyon Size" -> rddInfo.tachyonSize) ~ + ("ExternalBlockStore Size" -> rddInfo.externalBlockStoreSize) ~ ("Disk Size" -> rddInfo.diskSize) } def storageLevelToJson(storageLevel: StorageLevel): JValue = { ("Use Disk" -> storageLevel.useDisk) ~ ("Use Memory" -> storageLevel.useMemory) ~ - ("Use Tachyon" -> storageLevel.useOffHeap) ~ + ("Use ExternalBlockStore" -> storageLevel.useOffHeap) ~ ("Deserialized" -> storageLevel.deserialized) ~ ("Replication" -> storageLevel.replication) } @@ -389,7 +389,7 @@ private[spark] object JsonProtocol { val storageLevel = storageLevelToJson(blockStatus.storageLevel) ("Storage Level" -> storageLevel) ~ ("Memory Size" -> blockStatus.memSize) ~ - ("Tachyon Size" -> blockStatus.tachyonSize) ~ + ("ExternalBlockStore Size" -> blockStatus.externalBlockStoreSize) ~ ("Disk Size" -> blockStatus.diskSize) } @@ -787,13 +787,15 @@ private[spark] object JsonProtocol { val numPartitions = (json \ "Number of Partitions").extract[Int] val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] val memSize = (json \ "Memory Size").extract[Long] - val tachyonSize = (json \ "Tachyon Size").extract[Long] + // fallback to tachyon for backward compatability + val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome + .getOrElse(json \ "Tachyon Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize - rddInfo.tachyonSize = tachyonSize + rddInfo.externalBlockStoreSize = externalBlockStoreSize rddInfo.diskSize = diskSize rddInfo } @@ -801,18 +803,22 @@ private[spark] object JsonProtocol { def storageLevelFromJson(json: JValue): StorageLevel = { val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] - val useTachyon = (json \ "Use Tachyon").extract[Boolean] + // fallback to tachyon for backward compatability + val useExternalBlockStore = (json \ "Use ExternalBlockStore").toSome + .getOrElse(json \ "Use Tachyon").extract[Boolean] val deserialized = (json \ "Deserialized").extract[Boolean] val replication = (json \ "Replication").extract[Int] - StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication) + StorageLevel(useDisk, useMemory, useExternalBlockStore, deserialized, replication) } def blockStatusFromJson(json: JValue): BlockStatus = { val storageLevel = storageLevelFromJson(json \ "Storage Level") val memorySize = (json \ "Memory Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - val tachyonSize = (json \ "Tachyon Size").extract[Long] - BlockStatus(storageLevel, memorySize, diskSize, tachyonSize) + // fallback to tachyon for backward compatability + val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome + .getOrElse(json \ "Tachyon Size").extract[Long] + BlockStatus(storageLevel, memorySize, diskSize, externalBlockStoreSize) } def executorInfoFromJson(json: JValue): ExecutorInfo = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f5b410f41d..151955ef7f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -526,6 +526,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach test("tachyon storage") { // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar. val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false) + conf.set(ExternalBlockStore.BLOCK_MANAGER_NAME, ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME) if (tachyonUnitTestEnabled) { store = makeBlockManager(1200) val a1 = new Array[Byte](400) diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala index ef5c55f91c..ee1071cbcd 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala @@ -240,11 +240,11 @@ class StorageSuite extends FunSuite { assert(status.rddBlocksById(1000).size === status.numRddBlocksById(1000)) } - test("storage status memUsed, diskUsed, tachyonUsed") { + test("storage status memUsed, diskUsed, externalBlockStoreUsed") { val status = storageStatus2 def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum - def actualOffHeapUsed: Long = status.blocks.values.map(_.tachyonSize).sum + def actualOffHeapUsed: Long = status.blocks.values.map(_.externalBlockStoreSize).sum assert(status.memUsed === actualMemUsed) assert(status.diskUsed === actualDiskUsed) assert(status.offHeapUsed === actualOffHeapUsed) @@ -300,12 +300,12 @@ class StorageSuite extends FunSuite { assert(rddInfos(0).numCachedPartitions === 5) assert(rddInfos(0).memSize === 5L) assert(rddInfos(0).diskSize === 10L) - assert(rddInfos(0).tachyonSize === 0L) + assert(rddInfos(0).externalBlockStoreSize === 0L) assert(rddInfos(1).storageLevel === memAndDisk) assert(rddInfos(1).numCachedPartitions === 3) assert(rddInfos(1).memSize === 3L) assert(rddInfos(1).diskSize === 6L) - assert(rddInfos(1).tachyonSize === 0L) + assert(rddInfos(1).externalBlockStoreSize === 0L) } test("StorageUtils.getRddBlockLocations") { diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 3744e479d2..0ea9ea47b4 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -133,12 +133,12 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1)) assert(storageListener._rddInfoMap(0).memSize === 800L) assert(storageListener._rddInfoMap(0).diskSize === 400L) - assert(storageListener._rddInfoMap(0).tachyonSize === 200L) + assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L) assert(storageListener._rddInfoMap(0).numCachedPartitions === 3) assert(storageListener._rddInfoMap(0).isCached) assert(storageListener._rddInfoMap(1).memSize === 0L) assert(storageListener._rddInfoMap(1).diskSize === 240L) - assert(storageListener._rddInfoMap(1).tachyonSize === 0L) + assert(storageListener._rddInfoMap(1).externalBlockStoreSize === 0L) assert(storageListener._rddInfoMap(1).numCachedPartitions === 1) assert(storageListener._rddInfoMap(1).isCached) assert(!storageListener._rddInfoMap(2).isCached) @@ -155,7 +155,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2)) assert(storageListener._rddInfoMap(0).memSize === 400L) assert(storageListener._rddInfoMap(0).diskSize === 400L) - assert(storageListener._rddInfoMap(0).tachyonSize === 200L) + assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L) assert(storageListener._rddInfoMap(0).numCachedPartitions === 2) assert(storageListener._rddInfoMap(0).isCached) assert(!storageListener._rddInfoMap(1).isCached) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a2be724254..2d039cb75a 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -785,14 +785,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 201, | "Number of Cached Partitions": 301, | "Memory Size": 401, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 501 | } | ], @@ -969,12 +969,12 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": false, | "Replication": 2 | }, | "Memory Size": 0, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 0 | } | } @@ -1052,12 +1052,12 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": false, | "Replication": 2 | }, | "Memory Size": 0, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 0 | } | } @@ -1135,12 +1135,12 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": false, | "Replication": 2 | }, | "Memory Size": 0, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 0 | } | } @@ -1168,14 +1168,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 200, | "Number of Cached Partitions": 300, | "Memory Size": 400, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 500 | } | ], @@ -1207,14 +1207,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 400, | "Number of Cached Partitions": 600, | "Memory Size": 800, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 1000 | }, | { @@ -1223,14 +1223,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 401, | "Number of Cached Partitions": 601, | "Memory Size": 801, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 1001 | } | ], @@ -1262,14 +1262,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 600, | "Number of Cached Partitions": 900, | "Memory Size": 1200, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 1500 | }, | { @@ -1278,14 +1278,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 601, | "Number of Cached Partitions": 901, | "Memory Size": 1201, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 1501 | }, | { @@ -1294,14 +1294,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 602, | "Number of Cached Partitions": 902, | "Memory Size": 1202, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 1502 | } | ], @@ -1333,14 +1333,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 800, | "Number of Cached Partitions": 1200, | "Memory Size": 1600, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 2000 | }, | { @@ -1349,14 +1349,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 801, | "Number of Cached Partitions": 1201, | "Memory Size": 1601, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 2001 | }, | { @@ -1365,14 +1365,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 802, | "Number of Cached Partitions": 1202, | "Memory Size": 1602, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 2002 | }, | { @@ -1381,14 +1381,14 @@ class JsonProtocolSuite extends FunSuite { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, - | "Use Tachyon": false, + | "Use ExternalBlockStore": false, | "Deserialized": true, | "Replication": 1 | }, | "Number of Partitions": 803, | "Number of Cached Partitions": 1203, | "Memory Size": 1603, - | "Tachyon Size": 0, + | "ExternalBlockStore Size": 0, | "Disk Size": 2003 | } | ], |