aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorZhan Zhang <zhazhan@gmail.com>2015-04-30 22:24:31 -0700
committerPatrick Wendell <patrick@databricks.com>2015-04-30 22:24:31 -0700
commit36a7a6807ee58c8ba309c0e09e547919246b7f82 (patch)
tree25863218dad04112ec03b08a83d91a7ec10a0b3b /core
parentb5347a4664625ede6ab9d8ef6558457a34ae423f (diff)
downloadspark-36a7a6807ee58c8ba309c0e09e547919246b7f82.tar.gz
spark-36a7a6807ee58c8ba309c0e09e547919246b7f82.tar.bz2
spark-36a7a6807ee58c8ba309c0e09e547919246b7f82.zip
[SPARK-6479] [BLOCK MANAGER] Create off-heap block storage API
This is the classes for creating off-heap block storage API. It also includes the migration for Tachyon. The diff seems to be big, but it mainly just rename tachyon to offheap. New implementation for hdfs will be submit for review in spark-6112. Author: Zhan Zhang <zhazhan@gmail.com> Closes #5430 from zhzhan/SPARK-6479 and squashes the following commits: 60acd84 [Zhan Zhang] minor change to kickoff the test 12f54c9 [Zhan Zhang] solve merge conflicts a54132c [Zhan Zhang] solve review comments ffb8e00 [Zhan Zhang] rebase to sparkcontext change 6e121e0 [Zhan Zhang] resolve review comments and restructure blockmanasger code a7aed6c [Zhan Zhang] add Tachyon migration code 186de31 [Zhan Zhang] initial commit for off-heap block storage api
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala102
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala181
-rw-r--r--core/src/main/scala/org/apache/spark/storage/RDDInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala138
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonStore.scala128
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala56
21 files changed, 521 insertions, 324 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bae951f388..fe24260cdb 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -247,9 +247,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
- // Generate the random name for a temp folder in Tachyon
+ // Generate the random name for a temp folder in external block store.
// Add a timestamp as the suffix here to make it more safe
- val tachyonFolderName = "spark-" + randomUUID.toString()
+ val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
+ @deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
+ val tachyonFolderName = externalBlockStoreFolderName
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
@@ -386,7 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
- _conf.set("spark.tachyonStore.folderName", tachyonFolderName)
+ _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index 52862ae0ca..ea36fb60bd 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -33,11 +33,11 @@ object ExecutorExitCode {
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
- /** TachyonStore failed to initialize after many attempts. */
- val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
+ /** ExternalBlockStore failed to initialize after many attempts. */
+ val EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE = 54
- /** TachyonStore failed to create a local temporary directory after many attempts. */
- val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
+ /** ExternalBlockStore failed to create a local temporary directory after many attempts. */
+ val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55
def explainExitCode(exitCode: Int): String = {
exitCode match {
@@ -46,9 +46,11 @@ object ExecutorExitCode {
case OOM => "OutOfMemoryError"
case DISK_STORE_FAILED_TO_CREATE_DIR =>
"Failed to create local directory (bad spark.local.dir?)"
- case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
- case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
- "TachyonStore failed to create a local temporary directory."
+ // TODO: replace external block store with concrete implementation name
+ case EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE => "ExternalBlockStore failed to initialize."
+ // TODO: replace external block store with concrete implementation name
+ case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
+ "ExternalBlockStore failed to create a local temporary directory."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 330255f892..31c07c73fe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag](
val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
- " CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
+ " CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
info.numCachedPartitions, bytesToString(info.memSize),
- bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
+ bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))
s"$rdd [$persistence]" +: storageInfo
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 55718e584c..402ee1c764 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -78,19 +78,11 @@ private[spark] class BlockManager(
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
// Actual storage of where blocks are kept
- private var tachyonInitialized = false
+ private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
- private[spark] lazy val tachyonStore: TachyonStore = {
- val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
- val appFolderName = conf.get("spark.tachyonStore.folderName")
- val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
- val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
- val tachyonBlockManager =
- new TachyonBlockManager(this, tachyonStorePath, tachyonMaster)
- tachyonInitialized = true
- new TachyonStore(this, tachyonBlockManager)
- }
+ private[spark] lazy val externalBlockStore: ExternalBlockStore =
+ new ExternalBlockStore(this, executorId)
private[spark]
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -320,13 +312,13 @@ private[spark] class BlockManager(
/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
- * NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
+ * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
- // Assume that block is not in Tachyon
+ // Assume that block is not in external block store
BlockStatus(info.level, memSize, diskSize, 0L)
}
}
@@ -376,10 +368,10 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
- val inTachyonSize = status.tachyonSize
+ val inExternalBlockStoreSize = status.externalBlockStoreSize
val onDiskSize = status.diskSize
master.updateBlockInfo(
- blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
+ blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
} else {
true
}
@@ -397,15 +389,17 @@ private[spark] class BlockManager(
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
- val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
+ val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
- val replication = if (inMem || inTachyon || onDisk) level.replication else 1
- val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
+ val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
+ val storageLevel =
+ StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
- val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
+ val externalBlockStoreSize =
+ if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
- BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
+ BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
}
}
}
@@ -485,11 +479,11 @@ private[spark] class BlockManager(
}
}
- // Look for the block in Tachyon
+ // Look for the block in external block store
if (level.useOffHeap) {
- logDebug(s"Getting block $blockId from tachyon")
- if (tachyonStore.contains(blockId)) {
- tachyonStore.getBytes(blockId) match {
+ logDebug(s"Getting block $blockId from ExternalBlockStore")
+ if (externalBlockStore.contains(blockId)) {
+ externalBlockStore.getBytes(blockId) match {
case Some(bytes) =>
if (!asBlockResult) {
return Some(bytes)
@@ -498,7 +492,7 @@ private[spark] class BlockManager(
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
case None =>
- logDebug(s"Block $blockId not found in tachyon")
+ logDebug(s"Block $blockId not found in externalBlockStore")
}
}
}
@@ -766,8 +760,8 @@ private[spark] class BlockManager(
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
- // Use tachyon for off-heap storage
- (false, tachyonStore)
+ // Use external block store
+ (false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
@@ -802,7 +796,7 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
- // Now that the block is in either the memory, tachyon, or disk store,
+ // Now that the block is in either the memory, externalBlockStore, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
@@ -1099,10 +1093,11 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
- val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
- if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
+ val removedFromExternalBlockStore =
+ if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
+ if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
- "the disk, memory, or tachyon store")
+ "the disk, memory, or external block store")
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
@@ -1136,7 +1131,7 @@ private[spark] class BlockManager(
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
- if (level.useOffHeap) { tachyonStore.remove(id) }
+ if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
@@ -1216,8 +1211,8 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
- if (tachyonInitialized) {
- tachyonStore.clear()
+ if (externalBlockStoreInitialized) {
+ externalBlockStore.clear()
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 9bfc4201d3..a85e1c7632 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -54,9 +54,10 @@ class BlockManagerMaster(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long): Boolean = {
+ externalBlockStoreSize: Long): Boolean = {
val res = driverEndpoint.askWithRetry[Boolean](
- UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel,
+ memSize, diskSize, externalBlockStoreSize))
logDebug(s"Updated info of block $blockId")
res
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 7212362df5..3afb4c3c02 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint(
context.reply(true)
case UpdateBlockInfo(
- blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
+ blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
- blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize))
+ blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
@@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long): Boolean = {
+ externalBlockStoreSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
@@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint(
}
blockManagerInfo(blockManagerId).updateBlockInfo(
- blockId, storageLevel, memSize, diskSize, tachyonSize)
+ blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
@@ -396,8 +396,8 @@ case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long) {
- def isCached: Boolean = memSize + diskSize + tachyonSize > 0
+ externalBlockStoreSize: Long) {
+ def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
}
@DeveloperApi
@@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long) {
+ externalBlockStoreSize: Long) {
updateLastSeenMs()
@@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo(
}
if (storageLevel.isValid) {
- /* isValid means it is either stored in-memory, on-disk or on-Tachyon.
+ /* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
* The memSize here indicates the data size in or dropped from memory,
- * tachyonSize here indicates the data size in or dropped from Tachyon,
+ * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
@@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
if (storageLevel.useOffHeap) {
- _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
- logInfo("Added %s on tachyon on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
+ _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
+ logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
+ blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
@@ -482,8 +482,9 @@ private[spark] class BlockManagerInfo(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
if (blockStatus.storageLevel.useOffHeap) {
- logInfo("Removed %s on %s on tachyon (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
+ logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
+ blockId, blockManagerId.hostPort,
+ Utils.bytesToString(blockStatus.externalBlockStoreSize)))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index f89d8d7493..1683576067 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages {
var storageLevel: StorageLevel,
var memSize: Long,
var diskSize: Long,
- var tachyonSize: Long)
+ var externalBlockStoreSize: Long)
extends ToBlockManagerMaster
with Externalizable {
@@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages {
storageLevel.writeExternal(out)
out.writeLong(memSize)
out.writeLong(diskSize)
- out.writeLong(tachyonSize)
+ out.writeLong(externalBlockStoreSize)
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages {
storageLevel = StorageLevel(in)
memSize = in.readLong()
diskSize = in.readLong()
- tachyonSize = in.readLong()
+ externalBlockStoreSize = in.readLong()
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
new file mode 100644
index 0000000000..8964762df6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+/**
+ * An abstract class that the concrete external block manager has to inherit.
+ * The class has to have a no-argument constructor, and will be initialized by init,
+ * which is invoked by ExternalBlockStore. The main input parameter is blockId for all
+ * the methods, which is the unique identifier for Block in one Spark application.
+ *
+ * The underlying external block manager should avoid any name space conflicts among multiple
+ * Spark applications. For example, creating different directory for different applications
+ * by randomUUID
+ *
+ */
+private[spark] abstract class ExternalBlockManager {
+
+ override def toString: String = {"External Block Store"}
+
+ /**
+ * Initialize a concrete block manager implementation. Subclass should initialize its internal
+ * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore
+ * right after the class is constructed. The function should throw IOException on failure
+ *
+ * @throws java.io.IOException if there is any file system failure during the initialization.
+ */
+ def init(blockManager: BlockManager, executorId: String): Unit
+
+ /**
+ * Drop the block from underlying external block store, if it exists..
+ * @return true on successfully removing the block
+ * false if the block could not be removed as it was not found
+ *
+ * @throws java.io.IOException if there is any file system failure in removing the block.
+ */
+ def removeBlock(blockId: BlockId): Boolean
+
+ /**
+ * Used by BlockManager to check the existence of the block in the underlying external
+ * block store.
+ * @return true if the block exists.
+ * false if the block does not exists.
+ *
+ * @throws java.io.IOException if there is any file system failure in checking
+ * the block existence.
+ */
+ def blockExists(blockId: BlockId): Boolean
+
+ /**
+ * Put the given block to the underlying external block store. Note that in normal case,
+ * putting a block should never fail unless something wrong happens to the underlying
+ * external block store, e.g., file system failure, etc. In this case, IOException
+ * should be thrown.
+ *
+ * @throws java.io.IOException if there is any file system failure in putting the block.
+ */
+ def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
+
+ /**
+ * Retrieve the block bytes.
+ * @return Some(ByteBuffer) if the block bytes is successfully retrieved
+ * None if the block does not exist in the external block store.
+ *
+ * @throws java.io.IOException if there is any file system failure in getting the block.
+ */
+ def getBytes(blockId: BlockId): Option[ByteBuffer]
+
+ /**
+ * Get the size of the block saved in the underlying external block store,
+ * which is saved before by putBytes.
+ * @return size of the block
+ * 0 if the block does not exist
+ *
+ * @throws java.io.IOException if there is any file system failure in getting the block size.
+ */
+ def getSize(blockId: BlockId): Long
+
+ /**
+ * Clean up any information persisted in the underlying external block store,
+ * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore
+ * during system shutdown.
+ *
+ */
+ def shutdown()
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
new file mode 100644
index 0000000000..0bf770306a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import scala.util.control.NonFatal
+
+
+/**
+ * Stores BlockManager blocks on ExternalBlockStore.
+ * We capture any potential exception from underlying implementation
+ * and return with the expected failure value
+ */
+private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: String)
+ extends BlockStore(blockManager: BlockManager) with Logging {
+
+ lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager()
+
+ logInfo("ExternalBlockStore started")
+
+ override def getSize(blockId: BlockId): Long = {
+ try {
+ externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in getSize from $blockId", t)
+ 0L
+ }
+ }
+
+ override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
+ putIntoExternalBlockStore(blockId, bytes, returnValues = true)
+ }
+
+ override def putArray(
+ blockId: BlockId,
+ values: Array[Any],
+ level: StorageLevel,
+ returnValues: Boolean): PutResult = {
+ putIterator(blockId, values.toIterator, level, returnValues)
+ }
+
+ override def putIterator(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel,
+ returnValues: Boolean): PutResult = {
+ logDebug(s"Attempting to write values for block $blockId")
+ val bytes = blockManager.dataSerialize(blockId, values)
+ putIntoExternalBlockStore(blockId, bytes, returnValues)
+ }
+
+ private def putIntoExternalBlockStore(
+ blockId: BlockId,
+ bytes: ByteBuffer,
+ returnValues: Boolean): PutResult = {
+ // So that we do not modify the input offsets !
+ // duplicate does not copy buffer, so inexpensive
+ val byteBuffer = bytes.duplicate()
+ byteBuffer.rewind()
+ logDebug(s"Attempting to put block $blockId into ExtBlk store")
+ // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
+ try {
+ val startTime = System.currentTimeMillis
+ if (externalBlockManager.isDefined) {
+ externalBlockManager.get.putBytes(blockId, bytes)
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
+ blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
+
+ if (returnValues) {
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
+ } else {
+ PutResult(bytes.limit(), null)
+ }
+ } else {
+ logError(s"error in putBytes $blockId")
+ PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+ }
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in putBytes $blockId", t)
+ PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+ }
+ }
+
+ // We assume the block is removed even if exception thrown
+ override def remove(blockId: BlockId): Boolean = {
+ try {
+ externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in removing $blockId", t)
+ true
+ }
+ }
+
+ override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+ getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+ }
+
+ override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+ try {
+ externalBlockManager.flatMap(_.getBytes(blockId))
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in getBytes from $blockId", t)
+ None
+ }
+ }
+
+ override def contains(blockId: BlockId): Boolean = {
+ try {
+ val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
+ if (!ret) {
+ logInfo(s"remove block $blockId")
+ blockManager.removeBlock(blockId, true)
+ }
+ ret
+ } catch {
+ case NonFatal(t) =>
+ logError(s"error in getBytes from $blockId", t)
+ false
+ }
+ }
+
+ private def addShutdownHook() {
+ Runtime.getRuntime.addShutdownHook(new Thread("ExternalBlockStore shutdown hook") {
+ override def run(): Unit = Utils.logUncaughtExceptions {
+ logDebug("Shutdown hook called")
+ externalBlockManager.map(_.shutdown())
+ }
+ })
+ }
+
+ // Create concrete block manager and fall back to Tachyon by default for backward compatibility.
+ private def createBlkManager(): Option[ExternalBlockManager] = {
+ val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME)
+ .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
+
+ try {
+ val instance = Class.forName(clsName)
+ .newInstance()
+ .asInstanceOf[ExternalBlockManager]
+ instance.init(blockManager, executorId)
+ addShutdownHook();
+ Some(instance)
+ } catch {
+ case NonFatal(t) =>
+ logError("Cannot initialize external block store", t)
+ None
+ }
+ }
+}
+
+private[spark] object ExternalBlockStore extends Logging {
+ val MAX_DIR_CREATION_ATTEMPTS = 10
+ val SUB_DIRS_PER_DIR = "64"
+ val BASE_DIR = "spark.externalBlockStore.baseDir"
+ val FOLD_NAME = "spark.externalBlockStore.folderName"
+ val MASTER_URL = "spark.externalBlockStore.url"
+ val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager"
+ val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.TachyonBlockManager"
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 034525b56f..ad53a3edc7 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -32,16 +32,17 @@ class RDDInfo(
var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
- var tachyonSize = 0L
+ var externalBlockStoreSize = 0L
- def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0
+ def isCached: Boolean =
+ (memSize + diskSize + externalBlockStoreSize > 0) && numCachedPartitions > 0
override def toString: String = {
import Utils.bytesToString
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
- "MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
+ "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
- bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
+ bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo): Int = {
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 134abea866..703bce3e6b 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -26,9 +26,9 @@ import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
- * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to
- * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on
- * multiple nodes.
+ * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
+ * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
+ * to replicate the RDD partitions on multiple nodes.
*
* The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
@@ -126,7 +126,7 @@ class StorageLevel private(
var result = ""
result += (if (useDisk) "Disk " else "")
result += (if (useMemory) "Memory " else "")
- result += (if (useOffHeap) "Tachyon " else "")
+ result += (if (useOffHeap) "ExternalBlockStore " else "")
result += (if (deserialized) "Deserialized " else "Serialized ")
result += s"${replication}x Replicated"
result
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 2bd6b749be..c4ac30092f 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -199,33 +199,34 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize
val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize
- val changeInTachyon = newBlockStatus.tachyonSize - oldBlockStatus.tachyonSize
+ val changeInExternalBlockStore =
+ newBlockStatus.externalBlockStoreSize - oldBlockStatus.externalBlockStoreSize
val level = newBlockStatus.storageLevel
// Compute new info from old info
- val (oldMem, oldDisk, oldTachyon) = blockId match {
+ val (oldMem, oldDisk, oldExternalBlockStore) = blockId match {
case RDDBlockId(rddId, _) =>
_rddStorageInfo.get(rddId)
- .map { case (mem, disk, tachyon, _) => (mem, disk, tachyon) }
+ .map { case (mem, disk, externalBlockStore, _) => (mem, disk, externalBlockStore) }
.getOrElse((0L, 0L, 0L))
case _ =>
_nonRddStorageInfo
}
val newMem = math.max(oldMem + changeInMem, 0L)
val newDisk = math.max(oldDisk + changeInDisk, 0L)
- val newTachyon = math.max(oldTachyon + changeInTachyon, 0L)
+ val newExternalBlockStore = math.max(oldExternalBlockStore + changeInExternalBlockStore, 0L)
// Set the correct info
blockId match {
case RDDBlockId(rddId, _) =>
// If this RDD is no longer persisted, remove it
- if (newMem + newDisk + newTachyon == 0) {
+ if (newMem + newDisk + newExternalBlockStore == 0) {
_rddStorageInfo.remove(rddId)
} else {
- _rddStorageInfo(rddId) = (newMem, newDisk, newTachyon, level)
+ _rddStorageInfo(rddId) = (newMem, newDisk, newExternalBlockStore, level)
}
case _ =>
- _nonRddStorageInfo = (newMem, newDisk, newTachyon)
+ _nonRddStorageInfo = (newMem, newDisk, newExternalBlockStore)
}
}
@@ -247,13 +248,13 @@ private[spark] object StorageUtils {
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
- val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
+ val externalBlockStoreSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
rddInfo.storageLevel = storageLevel
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
rddInfo.diskSize = diskSize
- rddInfo.tachyonSize = tachyonSize
+ rddInfo.externalBlockStoreSize = externalBlockStoreSize
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index 583f1fdf04..bdc6276e41 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -17,13 +17,16 @@
package org.apache.spark.storage
+import java.io.IOException
+import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.{Date, Random}
+import com.google.common.io.ByteStreams
+import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
import tachyon.TachyonURI
-import tachyon.client.{TachyonFile, TachyonFS}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, SparkConf, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.util.Utils
@@ -32,32 +35,94 @@ import org.apache.spark.util.Utils
* Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By
* default, one block is mapped to one file with a name given by its BlockId.
*
- * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
*/
-private[spark] class TachyonBlockManager(
- blockManager: BlockManager,
- rootDirs: String,
- val master: String)
- extends Logging {
+private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
- val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
-
- if (client == null) {
- logError("Failed to connect to the Tachyon as the master address is not configured")
- System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_INITIALIZE)
- }
-
- private val MAX_DIR_CREATION_ATTEMPTS = 10
- private val subDirsPerTachyonDir =
- blockManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt
+ var blockManager: BlockManager =_
+ var rootDirs: String = _
+ var master: String = _
+ var client: tachyon.client.TachyonFS = _
+ private var subDirsPerTachyonDir: Int = _
// Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
// then, inside this directory, create multiple subdirectories that we will hash files into,
// in order to avoid having really large inodes at the top level in Tachyon.
- private val tachyonDirs: Array[TachyonFile] = createTachyonDirs()
- private val subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
+ private var tachyonDirs: Array[TachyonFile] = _
+ private var subDirs: Array[Array[tachyon.client.TachyonFile]] = _
+
+
+ override def init(blockManager: BlockManager, executorId: String): Unit = {
+ this.blockManager = blockManager
+ val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
+ val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
+
+ rootDirs = s"$storeDir/$appFolderName/$executorId"
+ master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
+ client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
+ // original implementation call System.exit, we change it to run without extblkstore support
+ if (client == null) {
+ logError("Failed to connect to the Tachyon as the master address is not configured")
+ throw new IOException("Failed to connect to the Tachyon as the master " +
+ "address is not configured")
+ }
+ subDirsPerTachyonDir = blockManager.conf.get("spark.externalBlockStore.subDirectories",
+ ExternalBlockStore.SUB_DIRS_PER_DIR).toInt
+
+ // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
+ // then, inside this directory, create multiple subdirectories that we will hash files into,
+ // in order to avoid having really large inodes at the top level in Tachyon.
+ tachyonDirs = createTachyonDirs()
+ subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
+ tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
+ }
+
+ override def toString: String = {"ExternalBlockStore-Tachyon"}
+
+ override def removeBlock(blockId: BlockId): Boolean = {
+ val file = getFile(blockId)
+ if (fileExists(file)) {
+ removeFile(file)
+ } else {
+ false
+ }
+ }
+
+ override def blockExists(blockId: BlockId): Boolean = {
+ val file = getFile(blockId)
+ fileExists(file)
+ }
- addShutdownHook()
+ override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
+ val file = getFile(blockId)
+ val os = file.getOutStream(WriteType.TRY_CACHE)
+ os.write(bytes.array())
+ os.close()
+ }
+
+ override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+ val file = getFile(blockId)
+ if (file == null || file.getLocationHosts.size == 0) {
+ return None
+ }
+ val is = file.getInStream(ReadType.CACHE)
+ assert (is != null)
+ try {
+ val size = file.length
+ val bs = new Array[Byte](size.asInstanceOf[Int])
+ ByteStreams.readFully(is, bs)
+ Some(ByteBuffer.wrap(bs))
+ } catch {
+ case ioe: IOException =>
+ logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
+ None
+ } finally {
+ is.close()
+ }
+ }
+
+ override def getSize(blockId: BlockId): Long = {
+ getFile(blockId.name).length
+ }
def removeFile(file: TachyonFile): Boolean = {
client.delete(new TachyonURI(file.getPath()), false)
@@ -109,7 +174,7 @@ private[spark] class TachyonBlockManager(
var tachyonDirId: String = null
var tries = 0
val rand = new Random()
- while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
+ while (!foundLocalDir && tries < ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS) {
tries += 1
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
@@ -124,30 +189,27 @@ private[spark] class TachyonBlockManager(
}
}
if (!foundLocalDir) {
- logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
- rootDir)
- System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
+ logError("Failed " + ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS
+ + " attempts to create tachyon dir in " + rootDir)
+ System.exit(ExecutorExitCode.EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR)
}
logInfo("Created tachyon directory at " + tachyonDir)
tachyonDir
}
}
- private def addShutdownHook() {
- tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
- Utils.addShutdownHook { () =>
- logDebug("Shutdown hook called")
- tachyonDirs.foreach { tachyonDir =>
- try {
- if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
- Utils.deleteRecursively(tachyonDir, client)
- }
- } catch {
- case e: Exception =>
- logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
+ override def shutdown() {
+ logDebug("Shutdown hook called")
+ tachyonDirs.foreach { tachyonDir =>
+ try {
+ if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+ Utils.deleteRecursively(tachyonDir, client)
}
+ } catch {
+ case e: Exception =>
+ logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
- client.close()
}
+ client.close()
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
deleted file mode 100644
index 65fa81704c..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import tachyon.client.TachyonFile
-
-/**
- * References a particular segment of a file (potentially the entire file), based off an offset and
- * a length.
- */
-private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) {
- override def toString: String = {
- "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
deleted file mode 100644
index 233d1e2b7c..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.io.IOException
-import java.nio.ByteBuffer
-
-import com.google.common.io.ByteStreams
-import tachyon.client.{ReadType, WriteType}
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-
-/**
- * Stores BlockManager blocks on Tachyon.
- */
-private[spark] class TachyonStore(
- blockManager: BlockManager,
- tachyonManager: TachyonBlockManager)
- extends BlockStore(blockManager: BlockManager) with Logging {
-
- logInfo("TachyonStore started")
-
- override def getSize(blockId: BlockId): Long = {
- tachyonManager.getFile(blockId.name).length
- }
-
- override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
- putIntoTachyonStore(blockId, bytes, returnValues = true)
- }
-
- override def putArray(
- blockId: BlockId,
- values: Array[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIterator(blockId, values.toIterator, level, returnValues)
- }
-
- override def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- logDebug(s"Attempting to write values for block $blockId")
- val bytes = blockManager.dataSerialize(blockId, values)
- putIntoTachyonStore(blockId, bytes, returnValues)
- }
-
- private def putIntoTachyonStore(
- blockId: BlockId,
- bytes: ByteBuffer,
- returnValues: Boolean): PutResult = {
- // So that we do not modify the input offsets !
- // duplicate does not copy buffer, so inexpensive
- val byteBuffer = bytes.duplicate()
- byteBuffer.rewind()
- logDebug(s"Attempting to put block $blockId into Tachyon")
- val startTime = System.currentTimeMillis
- val file = tachyonManager.getFile(blockId)
- val os = file.getOutStream(WriteType.TRY_CACHE)
- os.write(byteBuffer.array())
- os.close()
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored as %s file in Tachyon in %d ms".format(
- blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
-
- if (returnValues) {
- PutResult(bytes.limit(), Right(bytes.duplicate()))
- } else {
- PutResult(bytes.limit(), null)
- }
- }
-
- override def remove(blockId: BlockId): Boolean = {
- val file = tachyonManager.getFile(blockId)
- if (tachyonManager.fileExists(file)) {
- tachyonManager.removeFile(file)
- } else {
- false
- }
- }
-
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
- }
-
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- val file = tachyonManager.getFile(blockId)
- if (file == null || file.getLocationHosts.size == 0) {
- return None
- }
- val is = file.getInStream(ReadType.CACHE)
- assert (is != null)
- try {
- val size = file.length
- val bs = new Array[Byte](size.asInstanceOf[Int])
- ByteStreams.readFully(is, bs)
- Some(ByteBuffer.wrap(bs))
- } catch {
- case ioe: IOException =>
- logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
- None
- } finally {
- is.close()
- }
- }
-
- override def contains(blockId: BlockId): Boolean = {
- val file = tachyonManager.getFile(blockId)
- tachyonManager.fileExists(file)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 6ced6052d2..59dc6b547c 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -42,7 +42,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
"Cached Partitions",
"Fraction Cached",
"Size in Memory",
- "Size in Tachyon",
+ "Size in ExternalBlockStore",
"Size on Disk")
/** Render an HTML row representing an RDD */
@@ -59,7 +59,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
<td>{rdd.numCachedPartitions}</td>
<td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
<td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
- <td sorttable_customkey={rdd.tachyonSize.toString}>{Utils.bytesToString(rdd.tachyonSize)}</td>
+ <td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td>
<td sorttable_customkey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td>
</tr>
// scalastyle:on
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 474f79fb75..44d274956d 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -373,14 +373,14 @@ private[spark] object JsonProtocol {
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
- ("Tachyon Size" -> rddInfo.tachyonSize) ~
+ ("ExternalBlockStore Size" -> rddInfo.externalBlockStoreSize) ~
("Disk Size" -> rddInfo.diskSize)
}
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
- ("Use Tachyon" -> storageLevel.useOffHeap) ~
+ ("Use ExternalBlockStore" -> storageLevel.useOffHeap) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
@@ -389,7 +389,7 @@ private[spark] object JsonProtocol {
val storageLevel = storageLevelToJson(blockStatus.storageLevel)
("Storage Level" -> storageLevel) ~
("Memory Size" -> blockStatus.memSize) ~
- ("Tachyon Size" -> blockStatus.tachyonSize) ~
+ ("ExternalBlockStore Size" -> blockStatus.externalBlockStoreSize) ~
("Disk Size" -> blockStatus.diskSize)
}
@@ -787,13 +787,15 @@ private[spark] object JsonProtocol {
val numPartitions = (json \ "Number of Partitions").extract[Int]
val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
val memSize = (json \ "Memory Size").extract[Long]
- val tachyonSize = (json \ "Tachyon Size").extract[Long]
+ // fallback to tachyon for backward compatability
+ val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
+ .getOrElse(json \ "Tachyon Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel)
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
- rddInfo.tachyonSize = tachyonSize
+ rddInfo.externalBlockStoreSize = externalBlockStoreSize
rddInfo.diskSize = diskSize
rddInfo
}
@@ -801,18 +803,22 @@ private[spark] object JsonProtocol {
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
- val useTachyon = (json \ "Use Tachyon").extract[Boolean]
+ // fallback to tachyon for backward compatability
+ val useExternalBlockStore = (json \ "Use ExternalBlockStore").toSome
+ .getOrElse(json \ "Use Tachyon").extract[Boolean]
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
- StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication)
+ StorageLevel(useDisk, useMemory, useExternalBlockStore, deserialized, replication)
}
def blockStatusFromJson(json: JValue): BlockStatus = {
val storageLevel = storageLevelFromJson(json \ "Storage Level")
val memorySize = (json \ "Memory Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
- val tachyonSize = (json \ "Tachyon Size").extract[Long]
- BlockStatus(storageLevel, memorySize, diskSize, tachyonSize)
+ // fallback to tachyon for backward compatability
+ val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
+ .getOrElse(json \ "Tachyon Size").extract[Long]
+ BlockStatus(storageLevel, memorySize, diskSize, externalBlockStoreSize)
}
def executorInfoFromJson(json: JValue): ExecutorInfo = {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f5b410f41d..151955ef7f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -526,6 +526,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
test("tachyon storage") {
// TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
+ conf.set(ExternalBlockStore.BLOCK_MANAGER_NAME, ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
if (tachyonUnitTestEnabled) {
store = makeBlockManager(1200)
val a1 = new Array[Byte](400)
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index ef5c55f91c..ee1071cbcd 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -240,11 +240,11 @@ class StorageSuite extends FunSuite {
assert(status.rddBlocksById(1000).size === status.numRddBlocksById(1000))
}
- test("storage status memUsed, diskUsed, tachyonUsed") {
+ test("storage status memUsed, diskUsed, externalBlockStoreUsed") {
val status = storageStatus2
def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
- def actualOffHeapUsed: Long = status.blocks.values.map(_.tachyonSize).sum
+ def actualOffHeapUsed: Long = status.blocks.values.map(_.externalBlockStoreSize).sum
assert(status.memUsed === actualMemUsed)
assert(status.diskUsed === actualDiskUsed)
assert(status.offHeapUsed === actualOffHeapUsed)
@@ -300,12 +300,12 @@ class StorageSuite extends FunSuite {
assert(rddInfos(0).numCachedPartitions === 5)
assert(rddInfos(0).memSize === 5L)
assert(rddInfos(0).diskSize === 10L)
- assert(rddInfos(0).tachyonSize === 0L)
+ assert(rddInfos(0).externalBlockStoreSize === 0L)
assert(rddInfos(1).storageLevel === memAndDisk)
assert(rddInfos(1).numCachedPartitions === 3)
assert(rddInfos(1).memSize === 3L)
assert(rddInfos(1).diskSize === 6L)
- assert(rddInfos(1).tachyonSize === 0L)
+ assert(rddInfos(1).externalBlockStoreSize === 0L)
}
test("StorageUtils.getRddBlockLocations") {
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 3744e479d2..0ea9ea47b4 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -133,12 +133,12 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
assert(storageListener._rddInfoMap(0).memSize === 800L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
- assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+ assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
assert(storageListener._rddInfoMap(0).numCachedPartitions === 3)
assert(storageListener._rddInfoMap(0).isCached)
assert(storageListener._rddInfoMap(1).memSize === 0L)
assert(storageListener._rddInfoMap(1).diskSize === 240L)
- assert(storageListener._rddInfoMap(1).tachyonSize === 0L)
+ assert(storageListener._rddInfoMap(1).externalBlockStoreSize === 0L)
assert(storageListener._rddInfoMap(1).numCachedPartitions === 1)
assert(storageListener._rddInfoMap(1).isCached)
assert(!storageListener._rddInfoMap(2).isCached)
@@ -155,7 +155,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
assert(storageListener._rddInfoMap(0).memSize === 400L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
- assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+ assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
assert(storageListener._rddInfoMap(0).isCached)
assert(!storageListener._rddInfoMap(1).isCached)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a2be724254..2d039cb75a 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -785,14 +785,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 201,
| "Number of Cached Partitions": 301,
| "Memory Size": 401,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 501
| }
| ],
@@ -969,12 +969,12 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
@@ -1052,12 +1052,12 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
@@ -1135,12 +1135,12 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
@@ -1168,14 +1168,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 200,
| "Number of Cached Partitions": 300,
| "Memory Size": 400,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 500
| }
| ],
@@ -1207,14 +1207,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 400,
| "Number of Cached Partitions": 600,
| "Memory Size": 800,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1000
| },
| {
@@ -1223,14 +1223,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 401,
| "Number of Cached Partitions": 601,
| "Memory Size": 801,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1001
| }
| ],
@@ -1262,14 +1262,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 600,
| "Number of Cached Partitions": 900,
| "Memory Size": 1200,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1500
| },
| {
@@ -1278,14 +1278,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 601,
| "Number of Cached Partitions": 901,
| "Memory Size": 1201,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1501
| },
| {
@@ -1294,14 +1294,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 602,
| "Number of Cached Partitions": 902,
| "Memory Size": 1202,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 1502
| }
| ],
@@ -1333,14 +1333,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 800,
| "Number of Cached Partitions": 1200,
| "Memory Size": 1600,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 2000
| },
| {
@@ -1349,14 +1349,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 801,
| "Number of Cached Partitions": 1201,
| "Memory Size": 1601,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 2001
| },
| {
@@ -1365,14 +1365,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 802,
| "Number of Cached Partitions": 1202,
| "Memory Size": 1602,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 2002
| },
| {
@@ -1381,14 +1381,14 @@ class JsonProtocolSuite extends FunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use Tachyon": false,
+ | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 803,
| "Number of Cached Partitions": 1203,
| "Memory Size": 1603,
- | "Tachyon Size": 0,
+ | "ExternalBlockStore Size": 0,
| "Disk Size": 2003
| }
| ],