aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala102
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala181
-rw-r--r--core/src/main/scala/org/apache/spark/storage/RDDInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala138
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonStore.scala128
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala56
21 files changed, 521 insertions, 324 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bae951f388..fe24260cdb 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -247,9 +247,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
- // Generate the random name for a temp folder in Tachyon
+ // Generate the random name for a temp folder in external block store.
// Add a timestamp as the suffix here to make it more safe
- val tachyonFolderName = "spark-" + randomUUID.toString()
+ val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
+ @deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
+ val tachyonFolderName = externalBlockStoreFolderName
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
@@ -386,7 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
- _conf.set("spark.tachyonStore.folderName", tachyonFolderName)
+ _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index 52862ae0ca..ea36fb60bd 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -33,11 +33,11 @@ object ExecutorExitCode {
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
- /** TachyonStore failed to initialize after many attempts. */
- val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
+ /** ExternalBlockStore failed to initialize after many attempts. */
+ val EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE = 54
- /** TachyonStore failed to create a local temporary directory after many attempts. */
- val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
+ /** ExternalBlockStore failed to create a local temporary directory after many attempts. */
+ val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55
def explainExitCode(exitCode: Int): String = {
exitCode match {
@@ -46,9 +46,11 @@ object ExecutorExitCode {
case OOM => "OutOfMemoryError"
case DISK_STORE_FAILED_TO_CREATE_DIR =>
"Failed to create local directory (bad spark.local.dir?)"
- case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
- case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
- "TachyonStore failed to create a local temporary directory."
+ // TODO: replace external block store with concrete implementation name
+ case EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE => "ExternalBlockStore failed to initialize."
+ // TODO: replace external block store with concrete implementation name
+ case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
+ "ExternalBlockStore failed to create a local temporary directory."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 330255f892..31c07c73fe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag](
val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
- " CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
+ " CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
info.numCachedPartitions, bytesToString(info.memSize),
- bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
+ bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))
s"$rdd [$persistence]" +: storageInfo
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 55718e584c..402ee1c764 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -78,19 +78,11 @@ private[spark] class BlockManager(
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
// Actual storage of where blocks are kept
- private var tachyonInitialized = false
+ private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
- private[spark] lazy val tachyonStore: TachyonStore = {
- val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
- val appFolderName = conf.get("spark.tachyonStore.folderName")
- val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
- val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
- val tachyonBlockManager =
- new TachyonBlockManager(this, tachyonStorePath, tachyonMaster)
- tachyonInitialized = true
- new TachyonStore(this, tachyonBlockManager)
- }
+ private[spark] lazy val externalBlockStore: ExternalBlockStore =
+ new ExternalBlockStore(this, executorId)
private[spark]
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -320,13 +312,13 @@ private[spark] class BlockManager(
/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
- * NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
+ * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
- // Assume that block is not in Tachyon
+ // Assume that block is not in external block store
BlockStatus(info.level, memSize, diskSize, 0L)
}
}
@@ -376,10 +368,10 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
- val inTachyonSize = status.tachyonSize
+ val inExternalBlockStoreSize = status.externalBlockStoreSize
val onDiskSize = status.diskSize
master.updateBlockInfo(
- blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
+ blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
} else {
true
}
@@ -397,15 +389,17 @@ private[spark] class BlockManager(
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
- val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
+ val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
- val replication = if (inMem || inTachyon || onDisk) level.replication else 1
- val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
+ val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
+ val storageLevel =
+ StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
- val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
+ val externalBlockStoreSize =
+ if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
- BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
+ BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
}
}
}
@@ -485,11 +479,11 @@ private[spark] class BlockManager(
}
}
- // Look for the block in Tachyon
+ // Look for the block in external block store
if (level.useOffHeap) {
- logDebug(s"Getting block $blockId from tachyon")
- if (tachyonStore.contains(blockId)) {
- tachyonStore.getBytes(blockId) match {
+ logDebug(s"Getting block $blockId from ExternalBlockStore")
+ if (externalBlockStore.contains(blockId)) {
+ externalBlockStore.getBytes(blockId) match {
case Some(bytes) =>
if (!asBlockResult) {
return Some(bytes)
@@ -498,7 +492,7 @@ private[spark] class BlockManager(
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
case None =>
- logDebug(s"Block $blockId not found in tachyon")
+ logDebug(s"Block $blockId not found in externalBlockStore")
}
}
}
@@ -766,8 +760,8 @@ private[spark] class BlockManager(
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
- // Use tachyon for off-heap storage
- (false, tachyonStore)
+ // Use external block store
+ (false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
@@ -802,7 +796,7 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
- // Now that the block is in either the memory, tachyon, or disk store,
+ // Now that the block is in either the memory, externalBlockStore, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
@@ -1099,10 +1093,11 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
- val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
- if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
+ val removedFromExternalBlockStore =
+ if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
+ if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
- "the disk, memory, or tachyon store")
+ "the disk, memory, or external block store")
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
@@ -1136,7 +1131,7 @@ private[spark] class BlockManager(
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
- if (level.useOffHeap) { tachyonStore.remove(id) }
+ if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
@@ -1216,8 +1211,8 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
- if (tachyonInitialized) {
- tachyonStore.clear()
+ if (externalBlockStoreInitialized) {
+ externalBlockStore.clear()
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 9bfc4201d3..a85e1c7632 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -54,9 +54,10 @@ class BlockManagerMaster(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long): Boolean = {
+ externalBlockStoreSize: Long): Boolean = {
val res = driverEndpoint.askWithRetry[Boolean](
- UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel,
+ memSize, diskSize, externalBlockStoreSize))
logDebug(s"Updated info of block $blockId")
res
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 7212362df5..3afb4c3c02 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint(
context.reply(true)
case UpdateBlockInfo(
- blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
+ blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
- blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize))
+ blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
@@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long): Boolean = {
+ externalBlockStoreSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
@@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint(
}
blockManagerInfo(blockManagerId).updateBlockInfo(
- blockId, storageLevel, memSize, diskSize, tachyonSize)
+ blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
@@ -396,8 +396,8 @@ case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long) {
- def isCached: Boolean = memSize + diskSize + tachyonSize > 0
+ externalBlockStoreSize: Long) {
+ def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
}
@DeveloperApi
@@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long) {
+ externalBlockStoreSize: Long) {
updateLastSeenMs()
@@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo(
}
if (storageLevel.isValid) {
- /* isValid means it is either stored in-memory, on-disk or on-Tachyon.
+ /* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
* The memSize here indicates the data size in or dropped from memory,
- * tachyonSize here indicates the data size in or dropped from Tachyon,
+ * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
@@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
if (storageLevel.useOffHeap) {
- _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
- logInfo("Added %s on tachyon on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
+ _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
+ logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
+ blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
@@ -482,8 +482,9 @@ private[spark] class BlockManagerInfo(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
if (blockStatus.storageLevel.useOffHeap) {
- logInfo("Removed %s on %s on tachyon (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
+ logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
+ blockId, blockManagerId.hostPort,
+ Utils.bytesToString(blockStatus.externalBlockStoreSize)))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index f89d8d7493..1683576067 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages {
var storageLevel: StorageLevel,
var memSize: Long,
var diskSize: Long,
- var tachyonSize: Long)
+ var externalBlockStoreSize: Long)
extends ToBlockManagerMaster
with Externalizable {
@@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages {
storageLevel.writeExternal(out)
out.writeLong(memSize)
out.writeLong(diskSize)
- out.writeLong(tachyonSize)
+ out.writeLong(externalBlockStoreSize)
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages {
storageLevel = StorageLevel(in)
memSize = in.readLong()
diskSize = in.readLong()
- tachyonSize = in.readLong()
+ externalBlockStoreSize = in.readLong()
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
new file mode 100644
index 0000000000..8964762df6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+/**
+ * An abstract class that the concrete external block manager has to inherit.
+ * The class has to have a no-argument constructor, and will be initialized by init,
+ * which is invoked by ExternalBlockStore. The main input parameter is blockId for all
+ * the methods, which is the unique identifier for Block in one Spark application.
+ *
+ * The underlying external block manager should avoid any name space conflicts among multiple
+ * Spark applications. For example, creating different directory for different applications
+ * by randomUUID
+ *
+ */
+private[spark] abstract class ExternalBlockManager {
+
+ override def toString: String = {"External Block Store"}
+
+ /**
+ * Initialize a concrete block manager implementation. Subclass should initialize its internal
+ * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore
+ * right after the class is constructed. The function should throw IOException on failure
+ *
+ * @throws java.io.IOException if there is any file system failure during the initialization.
+ */
+ def init(blockManager: BlockManager, executorId: String): Unit
+
+ /**
+ * Drop the block from underlying external block store, if it exists..
+ * @return true on successfully removing the block
+ * false if the block could not be removed as it was not found
+ *
+ * @throws java.io.IOException if there is any file system failure in removing the block.
+ */
+ def removeBlock(blockId: BlockId): Boolean
+
+ /**
+ * Used by BlockManager to check the existence of the block in the underlying external
+ * block store.
+ * @return true if the block exists.
+ * false if the block does not exists.
+ *
+ * @throws java.io.IOException if there is any file system failure in checking
+ * the block existence.
+ */
+ def blockExists(blockId: BlockId): Boolean
+
+ /**
+ * Put the given block to the underlying external block store. Note that in normal case,
+ * putting a block should never fail unless something wrong happens to the underlying
+ * external block store, e.g., file system failure, etc. In this case, IOException
+ * should be thrown.
+ *
+ * @throws java.io.IOException if there is any file system failure in putting the block.
+ */
+ def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
+
+ /**
+ * Retrieve the block bytes.
+ * @return Some(ByteBuffer) if the block bytes is successfully retrieved
+ * None if the block does not exist in the external block store.
+ *
+ * @throws java.io.IOException if there is any file system failure in getting the block.
+ */
+ def getBytes(blockId: BlockId): Option[ByteBuffer]
+
+ /**
+ * Get the size of the block saved in the underlying external block store,
+ * which is saved before by putBytes.
+ * @return size of the block
+ * 0 if the block does not exist
+ *
+ * @throws java.io.IOException if there is any file system failure in getting the block size.
+ */
+ def getSize(blockId: BlockId): Long
+
+ /**
+ * Clean up any information persisted in the underlying external block store,
+ * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore
+ * during system shutdown.
+ *
+ */
+ def shutdown()
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
new file mode 100644
index 0000000000..0bf770306a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import scala.util.control.NonFatal
+
+
+/**
+ * Stores BlockManager blocks on ExternalBlockStore.
+ * We capture any potential exception from underlying implementation
+ * and return with the expected failure value
+ */
+private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: String)
+ extends BlockStore(blockManager: BlockManager) with Logging {
+
+ lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager()
+
+ logInfo("ExternalBlockStore started")
+
+ override def getSize(blockId: BlockId): Long = {
+ try {
+ externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in getSize from $blockId", t)
+ 0L
+ }
+ }
+
+ override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
+ putIntoExternalBlockStore(blockId, bytes, returnValues = true)
+ }
+
+ override def putArray(
+ blockId: BlockId,
+ values: Array[Any],
+ level: StorageLevel,
+ returnValues: Boolean): PutResult = {
+ putIterator(blockId, values.toIterator, level, returnValues)
+ }
+
+ override def putIterator(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel,
+ returnValues: Boolean): PutResult = {
+ logDebug(s"Attempting to write values for block $blockId")
+ val bytes = blockManager.dataSerialize(blockId, values)
+ putIntoExternalBlockStore(blockId, bytes, returnValues)
+ }
+
+ private def putIntoExternalBlockStore(
+ blockId: BlockId,
+ bytes: ByteBuffer,
+ returnValues: Boolean): PutResult = {
+ // So that we do not modify the input offsets !
+ // duplicate does not copy buffer, so inexpensive
+ val byteBuffer = bytes.duplicate()
+ byteBuffer.rewind()
+ logDebug(s"Attempting to put block $blockId into ExtBlk store")
+ // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
+ try {
+ val startTime = System.currentTimeMillis
+ if (externalBlockManager.isDefined) {
+ externalBlockManager.get.putBytes(blockId, bytes)
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
+ blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
+
+ if (returnValues) {
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
+ } else {
+ PutResult(bytes.limit(), null)
+ }
+ } else {
+ logError(s"error in putBytes $blockId")
+ PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+ }
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in putBytes $blockId", t)
+ PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+ }
+ }
+
+ // We assume the block is removed even if exception thrown
+ override def remove(blockId: BlockId): Boolean = {
+ try {
+ externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in removing $blockId", t)
+ true
+ }
+ }
+
+ override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+ getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+ }
+
+ override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+ try {
+ externalBlockManager.flatMap(_.getBytes(blockId))
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in getBytes from $blockId", t)
+ None
+ }
+ }
+
+ override def contains(blockId: BlockId): Boolean = {
+ try {
+ val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
+ if (!ret) {
+ logInfo(s"remove block $blockId")
+ blockManager.removeBlock(blockId, true)
+ }
+ ret
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in getBytes from $blockId", t)
+ false
+ }
+ }
+
+ private def addShutdownHook() {
+ Runtime.getRuntime.addShutdownHook(new Thread("ExternalBlockStore shutdown hook") {
+ override def run(): Unit = Utils.logUncaughtExceptions {
+ logDebug("Shutdown hook called")
+ externalBlockManager.map(_.shutdown())
+ }
+ })
+ }
+
+ // Create concrete block manager and fall back to Tachyon by default for backward compatibility.
+ private def createBlkManager(): Option[ExternalBlockManager] = {
+ val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME)
+ .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
+
+ try {
+ val instance = Class.forName(clsName)
+ .newInstance()
+ .asInstanceOf[ExternalBlockManager]
+ instance.init(blockManager, executorId)
+ addShutdownHook();
+ Some(instance)
+ } catch {
+ case NonFatal(t) =>
+ logError("Cannot initialize external block store", t)
+ None
+ }
+ }
+}
+
+private[spark] object ExternalBlockStore extends Logging {
+ val MAX_DIR_CREATION_ATTEMPTS = 10
+ val SUB_DIRS_PER_DIR = "64"
+ val BASE_DIR = "spark.externalBlockStore.baseDir"
+ val FOLD_NAME = "spark.externalBlockStore.folderName"
+ val MASTER_URL = "spark.externalBlockStore.url"
+ val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager"
+ val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.TachyonBlockManager"
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 034525b56f..ad53a3edc7 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -32,16 +32,17 @@ class RDDInfo(
var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
- var tachyonSize = 0L
+ var externalBlockStoreSize = 0L
- def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0
+ def isCached: Boolean =
+ (memSize + diskSize + externalBlockStoreSize > 0) && numCachedPartitions > 0
override def toString: String = {
import Utils.bytesToString
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
- "MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
+ "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
- bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
+ bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo): Int = {
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 134abea866..703bce3e6b 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -26,9 +26,9 @@ import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
- * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to
- * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on
- * multiple nodes.
+ * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
+ * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
+ * to replicate the RDD partitions on multiple nodes.
*
* The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
@@ -126,7 +126,7 @@ class StorageLevel private(
var result = ""
result += (if (useDisk) "Disk " else "")
result += (if (useMemory) "Memory " else "")
- result += (if (useOffHeap) "Tachyon " else "")
+ result += (if (useOffHeap) "ExternalBlockStore " else "")
result += (if (deserialized) "Deserialized " else "Serialized ")
result += s"${replication}x Replicated"
result
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 2bd6b749be..c4ac30092f 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -199,33 +199,34 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize
val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize
- val changeInTachyon = newBlockStatus.tachyonSize - oldBlockStatus.tachyonSize
+ val changeInExternalBlockStore =
+ newBlockStatus.externalBlockStoreSize - oldBlockStatus.externalBlockStoreSize
val level = newBlockStatus.storageLevel
// Compute new info from old info
- val (oldMem, oldDisk, oldTachyon) = blockId match {
+ val (oldMem, oldDisk, oldExternalBlockStore) = blockId match {
case RDDBlockId(rddId, _) =>
_rddStorageInfo.get(rddId)
- .map { case (mem, disk, tachyon, _) => (mem, disk, tachyon) }
+ .map { case (mem, disk, externalBlockStore, _) => (mem, disk, externalBlockStore) }
.getOrElse((0L, 0L, 0L))
case _ =>
_nonRddStorageInfo
}
val newMem = math.max(oldMem + changeInMem, 0L)
val newDisk = math.max(oldDisk + changeInDisk, 0L)
- val newTachyon = math.max(oldTachyon + changeInTachyon, 0L)
+ val newExternalBlockStore = math.max(oldExternalBlockStore + changeInExternalBlockStore, 0L)
// Set the correct info
blockId match {
case RDDBlockId(rddId, _) =>
// If this RDD is no longer persisted, remove it
- if (newMem + newDisk + newTachyon == 0) {
+ if (newMem + newDisk + newExternalBlockStore == 0) {
_rddStorageInfo.remove(rddId)
} else {
- _rddStorageInfo(rddId) = (newMem, newDisk, newTachyon, level)
+ _rddStorageInfo(rddId) = (newMem, newDisk, newExternalBlockStore, level)
}
case _ =>
- _nonRddStorageInfo = (newMem, newDisk, newTachyon)
+ _nonRddStorageInfo = (newMem, newDisk, newExternalBlockStore)
}
}
@@ -247,13 +248,13 @@ private[spark] object StorageUtils {
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
- val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
+ val externalBlockStoreSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
rddInfo.storageLevel = storageLevel
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
rddInfo.diskSize = diskSize
- rddInfo.tachyonSize = tachyonSize
+ rddInfo.externalBlockStoreSize = externalBlockStoreSize
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index 583f1fdf04..bdc6276e41 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -17,13 +17,16 @@
package org.apache.spark.storage
+import java.io.IOException
+import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.{Date, Random}
+import com.google.common.io.ByteStreams
+import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
import tachyon.TachyonURI
-import tachyon.client.{TachyonFile, TachyonFS}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, SparkConf, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.util.Utils
@@ -32,32 +35,94 @@ import org.apache.spark.util.Utils
* Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By
* default, one block is mapped to one file with a name given by its BlockId.
*
- * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
*/
-private[spark] class TachyonBlockManager(
- blockManager: BlockManager,
- rootDirs: String,
- val master: String)
- extends Logging {
+private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
- val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
-
- if (client == null) {
- logError("Failed to connect to the Tachyon as the master address is not configured")
- System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_INITIALIZE)
- }
-
- private val MAX_DIR_CREATION_ATTEMPTS = 10
- private val subDirsPerTachyonDir =
- blockManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt
+ var blockManager: BlockManager =_
+ var rootDirs: String = _
+ var master: String = _
+ var client: tachyon.client.TachyonFS = _
+ private var subDirsPerTachyonDir: Int = _
// Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
// then, inside this directory, create multiple subdirectories that we will hash files into,
// in order to avoid having really large inodes at the top level in Tachyon.
- private val tachyonDirs: Array[TachyonFile] = createTachyonDirs()
- private val subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
+ private var tachyonDirs: Array[TachyonFile] = _
+ private var subDirs: Array[Array[tachyon.client.TachyonFile]] = _
+
+
+ override def init(blockManager: BlockManager, executorId: String): Unit = {
+ this.blockManager = blockManager
+ val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
+ val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
+
+ rootDirs = s"$storeDir/$appFolderName/$executorId"
+ master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
+ client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
+ // original implementation call System.exit, we change it to run without extblkstore support
+ if (client == null) {
+ logError("Failed to connect to the Tachyon as the master address is not configured")
+ throw new IOException("Failed to connect to the Tachyon as the master " +
+ "address is not configured")
+ }
+ subDirsPerTachyonDir = blockManager.conf.get("spark.externalBlockStore.subDirectories",
+ ExternalBlockStore.SUB_DIRS_PER_DIR).toInt
+
+ // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
+ // then, inside this directory, create multiple subdirectories that we will hash files into,
+ // in order to avoid having really large inodes at the top level in Tachyon.
+ tachyonDirs = createTachyonDirs()
+ subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
+ tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
+ }
+
+ override def toString: String = {"ExternalBlockStore-Tachyon"}
+
+ override def removeBlock(blockId: BlockId): Boolean = {
+ val file = getFile(blockId)
+ if (fileExists(file)) {
+ removeFile(file)
+ } else {
+ false
+ }
+ }
+
+ override def blockExists(blockId: BlockId): Boolean = {
+ val file = getFile(blockId)
+ fileExists(file)
+ }
- addShutdownHook()
+ override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
+ val file = getFile(blockId)
+ val os = file.getOutStream(WriteType.TRY_CACHE)
+ os.write(bytes.array())
+ os.close()
+ }
+
+ override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+ val file = getFile(blockId)
+ if (file == null || file.getLocationHosts.size == 0) {
+ return None
+ }
+ val is = file.getInStream(ReadType.CACHE)
+ assert (is != null)
+ try {
+ val size = file.length
+ val bs = new Array[Byte](size.asInstanceOf[Int])
+ ByteStreams.readFully(is, bs)
+ Some(ByteBuffer.wrap(bs))
+ } catch {
+ case ioe: IOException =>
+ logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
+ None
+ } finally {
+ is.close()
+ }
+ }
+
+ override def getSize(blockId: BlockId): Long = {
+ getFile(blockId.name).length
+ }
def removeFile(file: TachyonFile): Boolean = {
client.delete(new TachyonURI(file.getPath()), false)
@@ -109,7 +174,7 @@ private[spark] class TachyonBlockManager(
var tachyonDirId: String = null
var tries = 0
val rand = new Random()
- while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
+ while (!foundLocalDir && tries < ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS) {
tries += 1
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
@@ -124,30 +189,27 @@ private[spark] class TachyonBlockManager(
}
}
if (!foundLocalDir) {
- logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
- rootDir)
- System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
+ logError("Failed " + ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS
+ + " attempts to create tachyon dir in " + rootDir)
+ System.exit(ExecutorExitCode.EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR)
}
logInfo("Created tachyon directory at " + tachyonDir)
tachyonDir
}
}
- private def addShutdownHook() {
- tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
- Utils.addShutdownHook { () =>
- logDebug("Shutdown hook called")
- tachyonDirs.foreach { tachyonDir =>
- try {
- if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
- Utils.deleteRecursively(tachyonDir, client)
- }
- } catch {
- case e: Exception =>
- logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
+ override def shutdown() {
+ logDebug("Shutdown hook called")
+ tachyonDirs.foreach { tachyonDir =>
+ try {
+ if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+ Utils.deleteRecursively(tachyonDir, client)
}
+ } catch {
+ case e: Exception =>
+ logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
- client.close()
}
+ client.close()
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
deleted file mode 100644
index 65fa81704c..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import tachyon.client.TachyonFile
-
-/**
- * References a particular segment of a file (potentially the entire file), based off an offset and
- * a length.
- */
-private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) {
- override def toString: String = {
- "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
deleted file mode 100644
index 233d1e2b7c..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.io.IOException
-import java.nio.ByteBuffer
-
-import com.google.common.io.ByteStreams
-import tachyon.client.{ReadType, WriteType}
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-
-/**
- * Stores BlockManager blocks on Tachyon.
- */
-private[spark] class TachyonStore(
- blockManager: BlockManager,
- tachyonManager: TachyonBlockManager)
- extends BlockStore(blockManager: BlockManager) with Logging {
-
- logInfo("TachyonStore started")
-
- override def getSize(blockId: BlockId): Long = {
- tachyonManager.getFile(blockId.name).length
- }
-
- override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
- putIntoTachyonStore(blockId, bytes, returnValues = true)
- }
-
- override def putArray(
- blockId: BlockId,
- values: Array[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIterator(blockId, values.toIterator, level, returnValues)
- }
-
- override def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- logDebug(s"Attempting to write values for block $blockId")
- val bytes = blockManager.dataSerialize(blockId, values)
- putIntoTachyonStore(blockId, bytes, returnValues)
- }
-
- private def putIntoTachyonStore(
- blockId: BlockId,
- bytes: ByteBuffer,
- returnValues: Boolean): PutResult = {
- // So that we do not modify the input offsets !
- // duplicate does not copy buffer, so inexpensive
- val byteBuffer = bytes.duplicate()
- byteBuffer.rewind()
- logDebug(s"Attempting to put block $blockId into Tachyon")
- val startTime = System.currentTimeMillis
- val file = tachyonManager.getFile(blockId)
- val os = file.getOutStream(WriteType.TRY_CACHE)
- os.write(byteBuffer.array())
- os.close()
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored as %s file in Tachyon in %d ms".format(
- blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
-
- if (returnValues) {
- PutResult(bytes.limit(), Right(bytes.duplicate()))
- } else {
- PutResult(bytes.limit(), null)
- }
- }
-
- override def remove(blockId: BlockId): Boolean = {
- val file = tachyonManager.getFile(blockId)
- if (tachyonManager.fileExists(file)) {
- tachyonManager.removeFile(file)
- } else {
- false
- }
- }
-
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
- }
-
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- val file = tachyonManager.getFile(blockId)
- if (file == null || file.getLocationHosts.size == 0) {
- return None
- }
- val is = file.getInStream(ReadType.CACHE)
- assert (is != null)
- try {
- val size = file.length
- val bs = new Array[Byte](size.asInstanceOf[Int])
- ByteStreams.readFully(is, bs)
- Some(ByteBuffer.wrap(bs))
- } catch {
- case ioe: IOException =>
- logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
- None
- } finally {
- is.close()
- }
- }
-
- override def contains(blockId: BlockId): Boolean = {
- val file = tachyonManager.getFile(blockId)
- tachyonManager.fileExists(file)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 6ced6052d2..59dc6b547c 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -42,7 +42,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
"Cached Partitions",
"Fraction Cached",
"Size in Memory",
- "Size in Tachyon",
+ "Size in ExternalBlockStore",
"Size on Disk")
/** Render an HTML row representing an RDD */
@@ -59,7 +59,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
<td>{rdd.numCachedPartitions}</td>
<td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
<td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
- <td sorttable_customkey={rdd.tachyonSize.toString}>{Utils.bytesToString(rdd.tachyonSize)}</td>
+ <td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td>
<td sorttable_customkey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td>
</tr>
// scalastyle:on
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 474f79fb75..44d274956d 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -373,14 +373,14 @@ private[spark] object JsonProtocol {
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
- ("Tachyon Size" -> rddInfo.tachyonSize) ~
+ ("ExternalBlockStore Size" -> rddInfo.externalBlockStoreSize) ~
("Disk Size" -> rddInfo.diskSize)
}
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
- ("Use Tachyon" -> storageLevel.useOffHeap) ~
+ ("Use ExternalBlockStore" -> storageLevel.useOffHeap) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
@@ -389,7 +389,7 @@ private[spark] object JsonProtocol {
val storageLevel = storageLevelToJson(blockStatus.storageLevel)
("Storage Level" -> storageLevel) ~
("Memory Size" -> blockStatus.memSize) ~
- ("Tachyon Size" -> blockStatus.tachyonSize) ~
+ ("ExternalBlockStore Size" -> blockStatus.externalBlockStoreSize) ~
("Disk Size" -> blockStatus.diskSize)
}
@@ -787,13 +787,15 @@ private[spark] object JsonProtocol {
val numPartitions = (json \ "Number of Partitions").extract[Int]
val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
val memSize = (json \ "Memory Size").extract[Long]
- val tachyonSize = (json \ "Tachyon Size").extract[Long]
+ // fallback to tachyon for backward compatability
+ val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
+ .getOrElse(json \ "Tachyon Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel)
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
- rddInfo.tachyonSize = tachyonSize
+ rddInfo.externalBlockStoreSize = externalBlockStoreSize
rddInfo.diskSize = diskSize
rddInfo
}
@@ -801,18 +803,22 @@ private[spark] object JsonProtocol {
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
- val useTachyon = (json \ "Use Tachyon").extract[Boolean]
+ // fallback to tachyon for backward compatability
+ val useExternalBlockStore = (json \ "Use ExternalBlockStore").toSome
+ .getOrElse(json \ "Use Tachyon").extract[Boolean]
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
- StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication)
+ StorageLevel(useDisk, useMemory, useExternalBlockStore, deserialized, replication)
}
def blockStatusFromJson(json: JValue): BlockStatus = {
val storageLevel = storageLevelFromJson(json \ "Storage Level")
val memorySize = (json \ "Memory Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
- val tachyonSize = (json \ "Tachyon Size").extract[Long]
- BlockStatus(storageLevel, memorySize, diskSize, tachyonSize)
+ // fallback to tachyon for backward compatability
+ val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
+ .getOrElse(json \ "Tachyon Size").extract[Long]
+ BlockStatus(storageLevel, memorySize, diskSize, externalBlockStoreSize)
}
def executorInfoFromJson(json: JValue): ExecutorInfo = {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f5b410f41d..151955ef7f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -526,6 +526,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
test("tachyon storage") {
// TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
+ conf.set(ExternalBlockStore.BLOCK_MANAGER_NAME, ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
if (tachyonUnitTestEnabled) {
store = makeBlockManager(1200)
val a1 = new Array[Byte](400)
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index ef5c55f91c..ee1071cbcd 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -240,11 +240,11 @@ class StorageSuite extends FunSuite {
assert(status.rddBlocksById(1000).size === status.numRddBlocksById(1000))
}
- test("storage status memUsed, diskUsed, tachyonUsed") {
+ test("storage status memUsed, diskUsed, externalBlockStoreUsed") {
val status = storageStatus2
def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
- def actualOffHeapUsed: Long = status.blocks.values.map(_.tachyonSize).sum
+ def actualOffHeapUsed: Long = status.blocks.values.map(_.externalBlockStoreSize).sum
assert(status.memUsed === actualMemUsed)
assert(status.diskUsed === actualDiskUsed)
assert(status.offHeapUsed === actualOffHeapUsed)
@@ -300,12 +300,12 @@ class StorageSuite extends FunSuite {
assert(rddInfos(0).numCachedPartitions === 5)
assert(rddInfos(0).memSize === 5L)
assert(rddInfos(0).diskSize === 10L)
- assert(rddInfos(0).tachyonSize === 0L)
+ assert(rddInfos(0).externalBlockStoreSize === 0L)
assert(rddInfos(1).storageLevel === memAndDisk)
assert(rddInfos(1).numCachedPartitions === 3)
assert(rddInfos(1).memSize === 3L)
assert(rddInfos(1).diskSize === 6L)
- assert(rddInfos(1).tachyonSize === 0L)
+ assert(rddInfos(1).externalBlockStoreSize === 0L)
}
test("StorageUtils.getRddBlockLocations") {
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 3744e479d2..0ea9ea47b4 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -133,12 +133,12 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
assert(storageListener._rddInfoMap(0).memSize === 800L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
- assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+ assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
assert(storageListener._rddInfoMap(0).numCachedPartitions === 3)
assert(storageListener._rddInfoMap(0).isCached)
assert(storageListener._rddInfoMap(1).memSize === 0L)
assert(storageListener._rddInfoMap(1).diskSize === 240L)
- assert(storageListener._rddInfoMap(1).tachyonSize === 0L)
+ assert(storageListener._rddInfoMap(1).externalBlockStoreSize === 0L)
assert(storageListener._rddInfoMap(1).numCachedPartitions === 1)
assert(storageListener._rddInfoMap(1).isCached)
assert(!storageListener._rddInfoMap(2).isCached)
@@ -155,7 +155,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
assert(storageListener._rddInfoMap(0).memSize === 400L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
- assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+ assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
assert(storageListener._rddInfoMap(0).isCached)
assert(!storageListener._rddInfoMap(1).isCached)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a2be724254..2d039cb75a 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -785,14 +785,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 201,
| "Number of Cached Partitions": 301,
| "Memory Size": 401,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 501
| }
| ],
@@ -969,12 +969,12 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
@@ -1052,12 +1052,12 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
@@ -1135,12 +1135,12 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
@@ -1168,14 +1168,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 200,
| "Number of Cached Partitions": 300,
| "Memory Size": 400,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 500
| }
| ],
@@ -1207,14 +1207,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 400,
| "Number of Cached Partitions": 600,
| "Memory Size": 800,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1000
| },
| {
@@ -1223,14 +1223,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 401,
| "Number of Cached Partitions": 601,
| "Memory Size": 801,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1001
| }
| ],
@@ -1262,14 +1262,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 600,
| "Number of Cached Partitions": 900,
| "Memory Size": 1200,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1500
| },
| {
@@ -1278,14 +1278,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 601,
| "Number of Cached Partitions": 901,
| "Memory Size": 1201,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1501
| },
| {
@@ -1294,14 +1294,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 602,
| "Number of Cached Partitions": 902,
| "Memory Size": 1202,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1502
| }
| ],
@@ -1333,14 +1333,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 800,
| "Number of Cached Partitions": 1200,
| "Memory Size": 1600,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 2000
| },
| {
@@ -1349,14 +1349,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 801,
| "Number of Cached Partitions": 1201,
| "Memory Size": 1601,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 2001
| },
| {
@@ -1365,14 +1365,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 802,
| "Number of Cached Partitions": 1202,
| "Memory Size": 1602,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 2002
| },
| {
@@ -1381,14 +1381,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 803,
| "Number of Cached Partitions": 1203,
| "Memory Size": 1603,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 2003
| }
| ],