diff options
author | Haoyuan Li <haoyuan@cs.berkeley.edu> | 2014-04-04 20:36:24 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-04 20:38:20 -0700 |
commit | b50ddfde0342990979979e58348f54c10b500c90 (patch) | |
tree | cc7fa4d089375cded5056d9a93079e0b23a32ae7 /core/src | |
parent | 1347ebd4b52ffb9197fc4137a55dff6badb149ba (diff) | |
download | spark-b50ddfde0342990979979e58348f54c10b500c90.tar.gz spark-b50ddfde0342990979979e58348f54c10b500c90.tar.bz2 spark-b50ddfde0342990979979e58348f54c10b500c90.zip |
SPARK-1305: Support persisting RDD's directly to Tachyon
Move the PR#468 of apache-incubator-spark to the apache-spark
"Adding an option to persist Spark RDD blocks into Tachyon."
Author: Haoyuan Li <haoyuan@cs.berkeley.edu>
Author: RongGu <gurongwalker@gmail.com>
Closes #158 from RongGu/master and squashes the following commits:
72b7768 [Haoyuan Li] merge master
9f7fa1b [Haoyuan Li] fix code style
ae7834b [Haoyuan Li] minor cleanup
a8b3ec6 [Haoyuan Li] merge master branch
e0f4891 [Haoyuan Li] better check offheap.
55b5918 [RongGu] address matei's comment on the replication of offHeap storagelevel
7cd4600 [RongGu] remove some logic code for tachyonstore's replication
51149e7 [RongGu] address aaron's comment on returning value of the remove() function in tachyonstore
8adfcfa [RongGu] address arron's comment on inTachyonSize
120e48a [RongGu] changed the root-level dir name in Tachyon
5cc041c [Haoyuan Li] address aaron's comments
9b97935 [Haoyuan Li] address aaron's comments
d9a6438 [Haoyuan Li] fix for pspark
77d2703 [Haoyuan Li] change python api.git status
3dcace4 [Haoyuan Li] address matei's comments
91fa09d [Haoyuan Li] address patrick's comments
589eafe [Haoyuan Li] use TRY_CACHE instead of MUST_CACHE
64348b2 [Haoyuan Li] update conf docs.
ed73e19 [Haoyuan Li] Merge branch 'master' of github.com:RongGu/spark-1
619a9a8 [RongGu] set number of directories in TachyonStore back to 64; added a TODO tag for duplicated code from the DiskStore
be79d77 [RongGu] find a way to clean up some unnecessay metods and classed to make the code simpler
49cc724 [Haoyuan Li] update docs with off_headp option
4572f9f [RongGu] reserving the old apply function API of StorageLevel
04301d3 [RongGu] rename StorageLevel.TACHYON to Storage.OFF_HEAP
c9aeabf [RongGu] rename the StorgeLevel.TACHYON as StorageLevel.OFF_HEAP
76805aa [RongGu] unifies the config properties name prefix; add the configs into docs/configuration.md
e700d9c [RongGu] add the SparkTachyonHdfsLR example and some comments
fd84156 [RongGu] use randomUUID to generate sparkapp directory name on tachyon;minor code style fix
939e467 [Haoyuan Li] 0.4.1-thrift from maven central
86a2eab [Haoyuan Li] tachyon 0.4.1-thrift is in the staging repo. but jenkins failed to download it. temporarily revert it back to 0.4.1
16c5798 [RongGu] make the dependency on tachyon as tachyon-0.4.1-thrift
eacb2e8 [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1
bbeb4de [RongGu] fix the JsonProtocolSuite test failure problem
6adb58f [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1
d827250 [RongGu] fix JsonProtocolSuie test failure
716e93b [Haoyuan Li] revert the version
ca14469 [Haoyuan Li] bump tachyon version to 0.4.1-thrift
2825a13 [RongGu] up-merging to the current master branch of the apache spark
6a22c1a [Haoyuan Li] fix scalastyle
8968b67 [Haoyuan Li] exclude more libraries from tachyon dependency to be the same as referencing tachyon-client.
77be7e8 [RongGu] address mateiz's comment about the temp folder name problem. The implementation followed mateiz's advice.
1dcadf9 [Haoyuan Li] typo
bf278fa [Haoyuan Li] fix python tests
e82909c [Haoyuan Li] minor cleanup
776a56c [Haoyuan Li] address patrick's and ali's comments from the previous PR
8859371 [Haoyuan Li] various minor fixes and clean up
e3ddbba [Haoyuan Li] add doc to use Tachyon cache mode.
fcaeab2 [Haoyuan Li] address Aaron's comment
e554b1e [Haoyuan Li] add python code
47304b3 [Haoyuan Li] make tachyonStore in BlockMananger lazy val; add more comments StorageLevels.
dc8ef24 [Haoyuan Li] add old storelevel constructor
e01a271 [Haoyuan Li] update tachyon 0.4.1
8011a96 [RongGu] fix a brought-in mistake in StorageLevel
70ca182 [RongGu] a bit change in comment
556978b [RongGu] fix the scalastyle errors
791189b [RongGu] "Adding an option to persist Spark RDD blocks into Tachyon." move the PR#468 of apache-incubator-spark to the apache-spark
Diffstat (limited to 'core/src')
19 files changed, 643 insertions, 100 deletions
diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java index 9f13b39909..840a1bd93b 100644 --- a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java +++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java @@ -23,17 +23,18 @@ import org.apache.spark.storage.StorageLevel; * Expose some commonly useful storage level constants. */ public class StorageLevels { - public static final StorageLevel NONE = create(false, false, false, 1); - public static final StorageLevel DISK_ONLY = create(true, false, false, 1); - public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2); - public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1); - public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2); - public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1); - public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2); - public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1); - public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2); - public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1); - public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2); + public static final StorageLevel NONE = create(false, false, false, false, 1); + public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1); + public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2); + public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1); + public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2); + public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1); + public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2); + public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1); + public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2); + public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1); + public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2); + public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1); /** * Create a new StorageLevel object. @@ -42,7 +43,26 @@ public class StorageLevels { * @param deserialized saved as deserialized objects, if true * @param replication replication factor */ - public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) { - return StorageLevel.apply(useDisk, useMemory, deserialized, replication); + @Deprecated + public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, + int replication) { + return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication); + } + + /** + * Create a new StorageLevel object. + * @param useDisk saved to disk, if true + * @param useMemory saved to memory, if true + * @param useOffHeap saved to Tachyon, if true + * @param deserialized saved as deserialized objects, if true + * @param replication replication factor + */ + public static StorageLevel create( + boolean useDisk, + boolean useMemory, + boolean useOffHeap, + boolean deserialized, + int replication) { + return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication); } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 835cffe37a..fcf16ce1b2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -19,14 +19,13 @@ package org.apache.spark import java.io._ import java.net.URI -import java.util.{Properties, UUID} import java.util.concurrent.atomic.AtomicInteger - +import java.util.{Properties, UUID} +import java.util.UUID.randomUUID import scala.collection.{Map, Set} import scala.collection.generic.Growable import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.{ClassTag, classTag} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} @@ -130,6 +129,11 @@ class SparkContext( val master = conf.get("spark.master") val appName = conf.get("spark.app.name") + // Generate the random name for a temp folder in Tachyon + // Add a timestamp as the suffix here to make it more safe + val tachyonFolderName = "spark-" + randomUUID.toString() + conf.set("spark.tachyonStore.folderName", tachyonFolderName) + val isLocal = (master == "local" || master.startsWith("local[")) if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 3486092a14..16887d8892 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -53,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? - executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) + executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, + false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -105,7 +106,8 @@ private[spark] object CoarseGrainedExecutorBackend { // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( - Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), + Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, + sparkHostPort, cores), name = "Executor") workerUrl.foreach{ url => actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index 210f3dbeeb..ceff3a067d 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -41,6 +41,12 @@ object ExecutorExitCode { /** DiskStore failed to create a local temporary directory after many attempts. */ val DISK_STORE_FAILED_TO_CREATE_DIR = 53 + /** TachyonStore failed to initialize after many attempts. */ + val TACHYON_STORE_FAILED_TO_INITIALIZE = 54 + + /** TachyonStore failed to create a local temporary directory after many attempts. */ + val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55 + def explainExitCode(exitCode: Int): String = { exitCode match { case UNCAUGHT_EXCEPTION => "Uncaught exception" @@ -48,6 +54,9 @@ object ExecutorExitCode { case OOM => "OutOfMemoryError" case DISK_STORE_FAILED_TO_CREATE_DIR => "Failed to create local directory (bad spark.local.dir?)" + case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize." + case TACHYON_STORE_FAILED_TO_CREATE_DIR => + "TachyonStore failed to create a local temporary directory." case _ => "Unknown executor exit code (" + exitCode + ")" + ( if (exitCode > 128) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 71584b6eb1..19138d9dde 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -19,22 +19,20 @@ package org.apache.spark.storage import java.io.{File, InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} - import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.util.Random - import akka.actor.{ActorSystem, Cancellable, Props} import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} import sun.nio.ch.DirectBuffer - import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.util._ + sealed trait Values case class ByteBufferValues(buffer: ByteBuffer) extends Values @@ -59,6 +57,17 @@ private[spark] class BlockManager( private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore = new DiskStore(this, diskBlockManager) + var tachyonInitialized = false + private[storage] lazy val tachyonStore: TachyonStore = { + val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") + val appFolderName = conf.get("spark.tachyonStore.folderName") + val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}" + val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998") + val tachyonBlockManager = new TachyonBlockManager( + shuffleBlockManager, tachyonStorePath, tachyonMaster) + tachyonInitialized = true + new TachyonStore(this, tachyonBlockManager) + } // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private val nettyPort: Int = { @@ -248,8 +257,10 @@ private[spark] class BlockManager( if (info.tellMaster) { val storageLevel = status.storageLevel val inMemSize = Math.max(status.memSize, droppedMemorySize) + val inTachyonSize = status.tachyonSize val onDiskSize = status.diskSize - master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) + master.updateBlockInfo( + blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize) } else true } @@ -259,22 +270,24 @@ private[spark] class BlockManager( * and the updated in-memory and on-disk sizes. */ private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { - val (newLevel, inMemSize, onDiskSize) = info.synchronized { + val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized { info.level match { case null => - (StorageLevel.NONE, 0L, 0L) + (StorageLevel.NONE, 0L, 0L, 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) + val inTachyon = level.useOffHeap && tachyonStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false - val replication = if (inMem || onDisk) level.replication else 1 - val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication) + val replication = if (inMem || inTachyon || onDisk) level.replication else 1 + val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L + val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L - (storageLevel, memSize, diskSize) + (storageLevel, memSize, diskSize, tachyonSize) } } - BlockStatus(newLevel, inMemSize, onDiskSize) + BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize) } /** @@ -354,6 +367,24 @@ private[spark] class BlockManager( logDebug("Block " + blockId + " not found in memory") } } + + // Look for the block in Tachyon + if (level.useOffHeap) { + logDebug("Getting block " + blockId + " from tachyon") + if (tachyonStore.contains(blockId)) { + tachyonStore.getBytes(blockId) match { + case Some(bytes) => { + if (!asValues) { + return Some(bytes) + } else { + return Some(dataDeserialize(blockId, bytes)) + } + } + case None => + logDebug("Block " + blockId + " not found in tachyon") + } + } + } // Look for block on disk, potentially storing it back into memory if required: if (level.useDisk) { @@ -620,6 +651,23 @@ private[spark] class BlockManager( } // Keep track of which blocks are dropped from memory res.droppedBlocks.foreach { block => updatedBlocks += block } + } else if (level.useOffHeap) { + // Save to Tachyon. + val res = data match { + case IteratorValues(iterator) => + tachyonStore.putValues(blockId, iterator, level, false) + case ArrayBufferValues(array) => + tachyonStore.putValues(blockId, array, level, false) + case ByteBufferValues(bytes) => { + bytes.rewind(); + tachyonStore.putBytes(blockId, bytes, level) + } + } + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case _ => + } } else { // Save directly to disk. // Don't get back the bytes unless we replicate them. @@ -644,8 +692,8 @@ private[spark] class BlockManager( val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { - // Now that the block is in either the memory or disk store, let other threads read it, - // and tell the master about it. + // Now that the block is in either the memory, tachyon, or disk store, + // let other threads read it, and tell the master about it. marked = true putBlockInfo.markReady(size) if (tellMaster) { @@ -707,7 +755,8 @@ private[spark] class BlockManager( */ var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) { - val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) + val tLevel = StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) if (cachedPeers == null) { cachedPeers = master.getPeers(blockManagerId, level.replication - 1) } @@ -832,9 +881,10 @@ private[spark] class BlockManager( // Removals are idempotent in disk store and memory store. At worst, we get a warning. val removedFromMemory = memoryStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId) - if (!removedFromMemory && !removedFromDisk) { + val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { logWarning("Block " + blockId + " could not be removed as it was not found in either " + - "the disk or memory store") + "the disk, memory, or tachyon store") } blockInfo.remove(blockId) if (tellMaster && info.tellMaster) { @@ -871,6 +921,9 @@ private[spark] class BlockManager( if (level.useDisk) { diskStore.remove(id) } + if (level.useOffHeap) { + tachyonStore.remove(id) + } iterator.remove() logInfo("Dropped block " + id) } @@ -946,6 +999,9 @@ private[spark] class BlockManager( blockInfo.clear() memoryStore.clear() diskStore.clear() + if (tachyonInitialized) { + tachyonStore.clear() + } metadataCleaner.cancel() broadcastCleaner.cancel() logInfo("BlockManager stopped") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index ed6937851b..4bc1b407ad 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -63,9 +63,10 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): Boolean = { + diskSize: Long, + tachyonSize: Long): Boolean = { val res = askDriverWithReply[Boolean]( - UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) + UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)) logInfo("Updated info of block " + blockId) res } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index ff2652b640..378f4cadc1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -73,10 +73,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus register(blockManagerId, maxMemSize, slaveActor) sender ! true - case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => + case UpdateBlockInfo( + blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) => // TODO: Ideally we want to handle all the message replies in receive instead of in the // individual private methods. - updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) + updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) case GetLocations(blockId) => sender ! getLocations(blockId) @@ -246,7 +247,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long) { + diskSize: Long, + tachyonSize: Long) { if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.executorId == "<driver>" && !isLocal) { @@ -265,7 +267,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus return } - blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) + blockManagerInfo(blockManagerId).updateBlockInfo( + blockId, storageLevel, memSize, diskSize, tachyonSize) var locations: mutable.HashSet[BlockManagerId] = null if (blockLocations.containsKey(blockId)) { @@ -309,8 +312,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } - -private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) +private[spark] case class BlockStatus( + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long, + tachyonSize: Long) private[spark] class BlockManagerInfo( val blockManagerId: BlockManagerId, @@ -336,7 +342,8 @@ private[spark] class BlockManagerInfo( blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long) { + diskSize: Long, + tachyonSize: Long) { updateLastSeenMs() @@ -350,23 +357,29 @@ private[spark] class BlockManagerInfo( } if (storageLevel.isValid) { - /* isValid means it is either stored in-memory or on-disk. + /* isValid means it is either stored in-memory, on-disk or on-Tachyon. * But the memSize here indicates the data size in or dropped from memory, + * tachyonSize here indicates the data size in or dropped from Tachyon, * and the diskSize here indicates the data size in or dropped to disk. * They can be both larger than 0, when a block is dropped from memory to disk. * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */ if (storageLevel.useMemory) { - _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0)) + _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0, 0)) _remainingMem -= memSize logInfo("Added %s in memory on %s (size: %s, free: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), Utils.bytesToString(_remainingMem))) } if (storageLevel.useDisk) { - _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize)) + _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize, 0)) logInfo("Added %s on disk on %s (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) } + if (storageLevel.useOffHeap) { + _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize)) + logInfo("Added %s on tachyon on %s (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize))) + } } else if (_blocks.containsKey(blockId)) { // If isValid is not true, drop the block. val blockStatus: BlockStatus = _blocks.get(blockId) @@ -381,6 +394,10 @@ private[spark] class BlockManagerInfo( logInfo("Removed %s on %s on disk (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize))) } + if (blockStatus.storageLevel.useOffHeap) { + logInfo("Removed %s on %s on tachyon (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize))) + } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index bbb9529b5a..8a36b5cc42 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -53,11 +53,12 @@ private[storage] object BlockManagerMessages { var blockId: BlockId, var storageLevel: StorageLevel, var memSize: Long, - var diskSize: Long) + var diskSize: Long, + var tachyonSize: Long) extends ToBlockManagerMaster with Externalizable { - def this() = this(null, null, null, 0, 0) // For deserialization only + def this() = this(null, null, null, 0, 0, 0) // For deserialization only override def writeExternal(out: ObjectOutput) { blockManagerId.writeExternal(out) @@ -65,6 +66,7 @@ private[storage] object BlockManagerMessages { storageLevel.writeExternal(out) out.writeLong(memSize) out.writeLong(diskSize) + out.writeLong(tachyonSize) } override def readExternal(in: ObjectInput) { @@ -73,6 +75,7 @@ private[storage] object BlockManagerMessages { storageLevel = StorageLevel(in) memSize = in.readLong() diskSize = in.readLong() + tachyonSize = in.readLong() } } @@ -81,13 +84,15 @@ private[storage] object BlockManagerMessages { blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): UpdateBlockInfo = { - new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) + diskSize: Long, + tachyonSize: Long): UpdateBlockInfo = { + new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize) } // For pattern-matching - def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, BlockId, StorageLevel, Long, Long)] = { - Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) + def unapply(h: UpdateBlockInfo) + : Option[(BlockManagerId, BlockId, StorageLevel, Long, Long, Long)] = { + Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize, h.tachyonSize)) } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 4212a539da..95e71de2d3 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -21,8 +21,9 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} /** * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, - * whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory - * in a serialized format, and whether to replicate the RDD partitions on multiple nodes. + * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to + * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on + * multiple nodes. * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants * for commonly useful storage levels. To create your own storage level object, use the * factory method of the singleton object (`StorageLevel(...)`). @@ -30,45 +31,58 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, + private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1) extends Externalizable { // TODO: Also add fields for caching priority, dataset ID, and flushing. private def this(flags: Int, replication: Int) { - this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) + this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) } - def this() = this(false, true, false) // For deserialization + def this() = this(false, true, false, false) // For deserialization def useDisk = useDisk_ def useMemory = useMemory_ + def useOffHeap = useOffHeap_ def deserialized = deserialized_ def replication = replication_ assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes") + if (useOffHeap) { + require(useDisk == false, "Off-heap storage level does not support using disk") + require(useMemory == false, "Off-heap storage level does not support using heap memory") + require(deserialized == false, "Off-heap storage level does not support deserialized storage") + require(replication == 1, "Off-heap storage level does not support multiple replication") + } + override def clone(): StorageLevel = new StorageLevel( - this.useDisk, this.useMemory, this.deserialized, this.replication) + this.useDisk, this.useMemory, this.useOffHeap, this.deserialized, this.replication) override def equals(other: Any): Boolean = other match { case s: StorageLevel => s.useDisk == useDisk && s.useMemory == useMemory && + s.useOffHeap == useOffHeap && s.deserialized == deserialized && s.replication == replication case _ => false } - def isValid = ((useMemory || useDisk) && (replication > 0)) + def isValid = ((useMemory || useDisk || useOffHeap) && (replication > 0)) def toInt: Int = { var ret = 0 if (useDisk_) { - ret |= 4 + ret |= 8 } if (useMemory_) { + ret |= 4 + } + if (useOffHeap_) { ret |= 2 } if (deserialized_) { @@ -84,8 +98,9 @@ class StorageLevel private( override def readExternal(in: ObjectInput) { val flags = in.readByte() - useDisk_ = (flags & 4) != 0 - useMemory_ = (flags & 2) != 0 + useDisk_ = (flags & 8) != 0 + useMemory_ = (flags & 4) != 0 + useOffHeap_ = (flags & 2) != 0 deserialized_ = (flags & 1) != 0 replication_ = in.readByte() } @@ -93,14 +108,15 @@ class StorageLevel private( @throws(classOf[IOException]) private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this) - override def toString: String = - "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) + override def toString: String = "StorageLevel(%b, %b, %b, %b, %d)".format( + useDisk, useMemory, useOffHeap, deserialized, replication) override def hashCode(): Int = toInt * 41 + replication def description : String = { var result = "" result += (if (useDisk) "Disk " else "") result += (if (useMemory) "Memory " else "") + result += (if (useOffHeap) "Tachyon " else "") result += (if (deserialized) "Deserialized " else "Serialized ") result += "%sx Replicated".format(replication) result @@ -113,22 +129,28 @@ class StorageLevel private( * new storage levels. */ object StorageLevel { - val NONE = new StorageLevel(false, false, false) - val DISK_ONLY = new StorageLevel(true, false, false) - val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) - val MEMORY_ONLY = new StorageLevel(false, true, true) - val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) - val MEMORY_ONLY_SER = new StorageLevel(false, true, false) - val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) - val MEMORY_AND_DISK = new StorageLevel(true, true, true) - val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) - val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) - val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) + val NONE = new StorageLevel(false, false, false, false) + val DISK_ONLY = new StorageLevel(true, false, false, false) + val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) + val MEMORY_ONLY = new StorageLevel(false, true, false, true) + val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) + val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) + val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) + val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) + val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) + val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) + val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) + val OFF_HEAP = new StorageLevel(false, false, true, false) + + /** Create a new StorageLevel object without setting useOffHeap */ + def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, + deserialized: Boolean, replication: Int) = getCachedStorageLevel( + new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)) /** Create a new StorageLevel object */ - def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, - replication: Int = 1): StorageLevel = - getCachedStorageLevel(new StorageLevel(useDisk, useMemory, deserialized, replication)) + def apply(useDisk: Boolean, useMemory: Boolean, + deserialized: Boolean, replication: Int = 1) = getCachedStorageLevel( + new StorageLevel(useDisk, useMemory, false, deserialized, replication)) /** Create a new StorageLevel object from its integer representation */ def apply(flags: Int, replication: Int): StorageLevel = diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 26565f56ad..7a17495903 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -44,7 +44,7 @@ private[spark] class StorageStatusListener extends SparkListener { storageStatusList.foreach { storageStatus => val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId) unpersistedBlocksIds.foreach { blockId => - storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L) + storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 6153dfe0b7..ff6e84cf98 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -48,17 +48,23 @@ class StorageStatus( } private[spark] -class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel) - extends Ordered[RDDInfo] { +class RDDInfo( + val id: Int, + val name: String, + val numPartitions: Int, + val storageLevel: StorageLevel) extends Ordered[RDDInfo] { var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L + var tachyonSize= 0L override def toString = { - ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " + - "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions, - numPartitions, Utils.bytesToString(memSize), Utils.bytesToString(diskSize)) + import Utils.bytesToString + ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" + + "TachyonSize: %s; DiskSize: %s").format( + name, id, storageLevel.toString, numCachedPartitions, numPartitions, + bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize)) } override def compare(that: RDDInfo) = { @@ -105,14 +111,17 @@ object StorageUtils { val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) => - // Add up memory and disk sizes - val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize > 0 } + // Add up memory, disk and Tachyon sizes + val persistedBlocks = + blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 } val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L) val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L) + val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L) rddInfoMap.get(rddId).map { rddInfo => rddInfo.numCachedPartitions = persistedBlocks.length rddInfo.memSize = memSize rddInfo.diskSize = diskSize + rddInfo.tachyonSize = tachyonSize rddInfo } }.toArray diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala new file mode 100644 index 0000000000..b0b9674856 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.text.SimpleDateFormat +import java.util.{Date, Random} + +import tachyon.client.TachyonFS +import tachyon.client.TachyonFile + +import org.apache.spark.Logging +import org.apache.spark.executor.ExecutorExitCode +import org.apache.spark.network.netty.ShuffleSender +import org.apache.spark.util.Utils + + +/** + * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By + * default, one block is mapped to one file with a name given by its BlockId. + * + * @param rootDirs The directories to use for storing block files. Data will be hashed among these. + */ +private[spark] class TachyonBlockManager( + shuffleManager: ShuffleBlockManager, + rootDirs: String, + val master: String) + extends Logging { + + val client = if (master != null && master != "") TachyonFS.get(master) else null + + if (client == null) { + logError("Failed to connect to the Tachyon as the master address is not configured") + System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_INITIALIZE) + } + + private val MAX_DIR_CREATION_ATTEMPTS = 10 + private val subDirsPerTachyonDir = + shuffleManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt + + // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; + // then, inside this directory, create multiple subdirectories that we will hash files into, + // in order to avoid having really large inodes at the top level in Tachyon. + private val tachyonDirs: Array[TachyonFile] = createTachyonDirs() + private val subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir)) + + addShutdownHook() + + def removeFile(file: TachyonFile): Boolean = { + client.delete(file.getPath(), false) + } + + def fileExists(file: TachyonFile): Boolean = { + client.exist(file.getPath()) + } + + def getFile(filename: String): TachyonFile = { + // Figure out which tachyon directory it hashes to, and which subdirectory in that + val hash = Utils.nonNegativeHash(filename) + val dirId = hash % tachyonDirs.length + val subDirId = (hash / tachyonDirs.length) % subDirsPerTachyonDir + + // Create the subdirectory if it doesn't already exist + var subDir = subDirs(dirId)(subDirId) + if (subDir == null) { + subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId) + client.mkdir(path) + val newDir = client.getFile(path) + subDirs(dirId)(subDirId) = newDir + newDir + } + } + } + val filePath = subDir + "/" + filename + if(!client.exist(filePath)) { + client.createFile(filePath) + } + val file = client.getFile(filePath) + file + } + + def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name) + + // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore. + private def createTachyonDirs(): Array[TachyonFile] = { + logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'") + val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") + rootDirs.split(",").map { rootDir => + var foundLocalDir = false + var tachyonDir: TachyonFile = null + var tachyonDirId: String = null + var tries = 0 + val rand = new Random() + while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { + tries += 1 + try { + tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) + val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId + if (!client.exist(path)) { + foundLocalDir = client.mkdir(path) + tachyonDir = client.getFile(path) + } + } catch { + case e: Exception => + logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e) + } + } + if (!foundLocalDir) { + logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " + + rootDir) + System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR) + } + logInfo("Created tachyon directory at " + tachyonDir) + tachyonDir + } + } + + private def addShutdownHook() { + tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir)) + Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") { + override def run() { + logDebug("Shutdown hook called") + tachyonDirs.foreach { tachyonDir => + try { + if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) { + Utils.deleteRecursively(tachyonDir, client) + } + } catch { + case t: Throwable => + logError("Exception while deleting tachyon spark dir: " + tachyonDir, t) + } + } + } + }) + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala new file mode 100644 index 0000000000..b86abbda1d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import tachyon.client.TachyonFile + +/** + * References a particular segment of a file (potentially the entire file), based off an offset and + * a length. + */ +private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) { + override def toString = "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length) +} diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala new file mode 100644 index 0000000000..c37e76f893 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.IOException +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer + +import tachyon.client.{WriteType, ReadType} + +import org.apache.spark.Logging +import org.apache.spark.util.Utils +import org.apache.spark.serializer.Serializer + + +private class Entry(val size: Long) + + +/** + * Stores BlockManager blocks on Tachyon. + */ +private class TachyonStore( + blockManager: BlockManager, + tachyonManager: TachyonBlockManager) + extends BlockStore(blockManager: BlockManager) with Logging { + + logInfo("TachyonStore started") + + override def getSize(blockId: BlockId): Long = { + tachyonManager.getFile(blockId.name).length + } + + override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = { + putToTachyonStore(blockId, bytes, true) + } + + override def putValues( + blockId: BlockId, + values: ArrayBuffer[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + return putValues(blockId, values.toIterator, level, returnValues) + } + + override def putValues( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + logDebug("Attempting to write values for block " + blockId) + val _bytes = blockManager.dataSerialize(blockId, values) + putToTachyonStore(blockId, _bytes, returnValues) + } + + private def putToTachyonStore( + blockId: BlockId, + bytes: ByteBuffer, + returnValues: Boolean): PutResult = { + // So that we do not modify the input offsets ! + // duplicate does not copy buffer, so inexpensive + val byteBuffer = bytes.duplicate() + byteBuffer.rewind() + logDebug("Attempting to put block " + blockId + " into Tachyon") + val startTime = System.currentTimeMillis + val file = tachyonManager.getFile(blockId) + val os = file.getOutStream(WriteType.TRY_CACHE) + os.write(byteBuffer.array()) + os.close() + val finishTime = System.currentTimeMillis + logDebug("Block %s stored as %s file in Tachyon in %d ms".format( + blockId, Utils.bytesToString(byteBuffer.limit), (finishTime - startTime))) + + if (returnValues) { + PutResult(bytes.limit(), Right(bytes.duplicate())) + } else { + PutResult(bytes.limit(), null) + } + } + + override def remove(blockId: BlockId): Boolean = { + val file = tachyonManager.getFile(blockId) + if (tachyonManager.fileExists(file)) { + tachyonManager.removeFile(file) + } else { + false + } + } + + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { + getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) + } + + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val file = tachyonManager.getFile(blockId) + if (file == null || file.getLocationHosts().size == 0) { + return None + } + val is = file.getInStream(ReadType.CACHE) + var buffer: ByteBuffer = null + try { + if (is != null) { + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) + buffer = ByteBuffer.wrap(bs) + if (fetchSize != size) { + logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size + + " is not equal to fetched size " + fetchSize) + return None + } + } + } catch { + case ioe: IOException => { + logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe) + return None + } + } + Some(buffer) + } + + override def contains(blockId: BlockId): Boolean = { + val file = tachyonManager.getFile(blockId) + tachyonManager.fileExists(file) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index b2732de510..0fa461e5e9 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -33,6 +33,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) { private lazy val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { + val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds) UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage) @@ -45,6 +46,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) { "Cached Partitions", "Fraction Cached", "Size in Memory", + "Size in Tachyon", "Size on Disk") /** Render an HTML row representing an RDD */ @@ -60,6 +62,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) { <td>{rdd.numCachedPartitions}</td> <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td> <td>{Utils.bytesToString(rdd.memSize)}</td> + <td>{Utils.bytesToString(rdd.tachyonSize)}</td> <td>{Utils.bytesToString(rdd.diskSize)}</td> </tr> } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index d9a6af6187..2155a8888c 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -274,12 +274,14 @@ private[spark] object JsonProtocol { ("Number of Partitions" -> rddInfo.numPartitions) ~ ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ ("Memory Size" -> rddInfo.memSize) ~ + ("Tachyon Size" -> rddInfo.tachyonSize) ~ ("Disk Size" -> rddInfo.diskSize) } def storageLevelToJson(storageLevel: StorageLevel): JValue = { ("Use Disk" -> storageLevel.useDisk) ~ ("Use Memory" -> storageLevel.useMemory) ~ + ("Use Tachyon" -> storageLevel.useOffHeap) ~ ("Deserialized" -> storageLevel.deserialized) ~ ("Replication" -> storageLevel.replication) } @@ -288,6 +290,7 @@ private[spark] object JsonProtocol { val storageLevel = storageLevelToJson(blockStatus.storageLevel) ("Storage Level" -> storageLevel) ~ ("Memory Size" -> blockStatus.memSize) ~ + ("Tachyon Size" -> blockStatus.tachyonSize) ~ ("Disk Size" -> blockStatus.diskSize) } @@ -570,11 +573,13 @@ private[spark] object JsonProtocol { val numPartitions = (json \ "Number of Partitions").extract[Int] val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] val memSize = (json \ "Memory Size").extract[Long] + val tachyonSize = (json \ "Tachyon Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize + rddInfo.tachyonSize = tachyonSize rddInfo.diskSize = diskSize rddInfo } @@ -582,16 +587,18 @@ private[spark] object JsonProtocol { def storageLevelFromJson(json: JValue): StorageLevel = { val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] + val useTachyon = (json \ "Use Tachyon").extract[Boolean] val deserialized = (json \ "Deserialized").extract[Boolean] val replication = (json \ "Replication").extract[Int] - StorageLevel(useDisk, useMemory, deserialized, replication) + StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication) } def blockStatusFromJson(json: JValue): BlockStatus = { val storageLevel = storageLevelFromJson(json \ "Storage Level") val memorySize = (json \ "Memory Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - BlockStatus(storageLevel, memorySize, diskSize) + val tachyonSize = (json \ "Tachyon Size").extract[Long] + BlockStatus(storageLevel, memorySize, diskSize, tachyonSize) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 737b765e2a..d3c39dee33 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -34,11 +34,13 @@ import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.json4s._ +import tachyon.client.{TachyonFile,TachyonFS} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} + /** * Various utility methods used by Spark. */ @@ -153,6 +155,7 @@ private[spark] object Utils extends Logging { } private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() + private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]() // Register the path to be deleted via shutdown hook def registerShutdownDeleteDir(file: File) { @@ -162,6 +165,14 @@ private[spark] object Utils extends Logging { } } + // Register the tachyon path to be deleted via shutdown hook + def registerShutdownDeleteDir(tachyonfile: TachyonFile) { + val absolutePath = tachyonfile.getPath() + shutdownDeleteTachyonPaths.synchronized { + shutdownDeleteTachyonPaths += absolutePath + } + } + // Is the path already registered to be deleted via a shutdown hook ? def hasShutdownDeleteDir(file: File): Boolean = { val absolutePath = file.getAbsolutePath() @@ -170,6 +181,14 @@ private[spark] object Utils extends Logging { } } + // Is the path already registered to be deleted via a shutdown hook ? + def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = { + val absolutePath = file.getPath() + shutdownDeletePaths.synchronized { + shutdownDeletePaths.contains(absolutePath) + } + } + // Note: if file is child of some registered path, while not equal to it, then return true; // else false. This is to ensure that two shutdown hooks do not try to delete each others // paths - resulting in IOException and incomplete cleanup. @@ -186,6 +205,22 @@ private[spark] object Utils extends Logging { retval } + // Note: if file is child of some registered path, while not equal to it, then return true; + // else false. This is to ensure that two shutdown hooks do not try to delete each others + // paths - resulting in Exception and incomplete cleanup. + def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = { + val absolutePath = file.getPath() + val retval = shutdownDeletePaths.synchronized { + shutdownDeletePaths.find { path => + !absolutePath.equals(path) && absolutePath.startsWith(path) + }.isDefined + } + if (retval) { + logInfo("path = " + file + ", already present as root for deletion.") + } + retval + } + /** Create a temporary directory inside the given parent directory */ def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { var attempts = 0 @@ -541,7 +576,16 @@ private[spark] object Utils extends Logging { } /** - * Check to see if file is a symbolic link. + * Delete a file or directory and its contents recursively. + */ + def deleteRecursively(dir: TachyonFile, client: TachyonFS) { + if (!client.delete(dir.getPath(), true)) { + throw new IOException("Failed to delete the tachyon dir: " + dir) + } + } + + /** + * Check to see if file is a symbolic link. */ def isSymlink(file: File): Boolean = { if (file == null) throw new NullPointerException("File must not be null") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e83cd55e73..b6dd052610 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -96,9 +96,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("StorageLevel object caching") { - val level1 = StorageLevel(false, false, false, 3) - val level2 = StorageLevel(false, false, false, 3) // this should return the same object as level1 - val level3 = StorageLevel(false, false, false, 2) // this should return a different object + val level1 = StorageLevel(false, false, false, false, 3) + val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1 + val level3 = StorageLevel(false, false, false, false, 2) // this should return a different object assert(level2 === level1, "level2 is not same as level1") assert(level2.eq(level1), "level2 is not the same object as level1") assert(level3 != level1, "level3 is same as level1") @@ -410,6 +410,25 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store") } + test("tachyon storage") { + // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar. + val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false) + if (tachyonUnitTestEnabled) { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.OFF_HEAP) + store.putSingle("a2", a2, StorageLevel.OFF_HEAP) + store.putSingle("a3", a3, StorageLevel.OFF_HEAP) + assert(store.getSingle("a3").isDefined, "a3 was in store") + assert(store.getSingle("a2").isDefined, "a2 was in store") + assert(store.getSingle("a1").isDefined, "a1 was in store") + } else { + info("tachyon storage test disabled.") + } + } + test("on-disk storage") { store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 40c29014c4..054eb01a64 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -456,7 +456,7 @@ class JsonProtocolSuite extends FunSuite { t.shuffleWriteMetrics = Some(sw) // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => - (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i)) + (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i)) }.toSeq) t } @@ -470,19 +470,19 @@ class JsonProtocolSuite extends FunSuite { """ {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name": "greetings","Number of Tasks":200,"RDD Info":{"RDD ID":100,"Name":"mayor","Storage - Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1}, - "Number of Partitions":200,"Number of Cached Partitions":300,"Memory Size":400, - "Disk Size":500},"Emitted Task Size Warning":false},"Properties":{"France":"Paris", - "Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} + Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, + "Replication":1},"Number of Partitions":200,"Number of Cached Partitions":300, + "Memory Size":400,"Disk Size":500,"Tachyon Size":0},"Emitted Task Size Warning":false}, + "Properties":{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} """ private val stageCompletedJsonString = """ {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name": "greetings","Number of Tasks":201,"RDD Info":{"RDD ID":101,"Name":"mayor","Storage - Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1}, - "Number of Partitions":201,"Number of Cached Partitions":301,"Memory Size":401, - "Disk Size":501},"Emitted Task Size Warning":false}} + Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, + "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301, + "Memory Size":401,"Disk Size":501,"Tachyon Size":0},"Emitted Task Size Warning":false}} """ private val taskStartJsonString = @@ -515,8 +515,8 @@ class JsonProtocolSuite extends FunSuite { 700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics": {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks": [{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status": - {"Storage Level":{"Use Disk":true,"Use Memory":true,"Deserialized":false, - "Replication":2},"Memory Size":0,"Disk Size":0}}]}} + {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, + "Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}} """ private val jobStartJsonString = |