aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/pom.xml47
-rw-r--r--core/src/main/java/org/apache/spark/api/java/StorageLevels.java46
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala72
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala155
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonStore.scala142
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala46
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala20
-rw-r--r--docs/configuration.md39
-rw-r--r--docs/scala-programming-guide.md127
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkPi.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala80
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala52
-rw-r--r--project/SparkBuild.scala17
-rw-r--r--python/pyspark/context.py7
-rw-r--r--python/pyspark/rdd.py3
-rw-r--r--python/pyspark/storagelevel.py28
29 files changed, 976 insertions, 169 deletions
diff --git a/core/pom.xml b/core/pom.xml
index e4c32eff0c..66f9fc4961 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -201,6 +201,53 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.tachyonproject</groupId>
+ <artifactId>tachyon</artifactId>
+ <version>0.4.1-thrift</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-jsp</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-webapp</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
index 9f13b39909..840a1bd93b 100644
--- a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
+++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
@@ -23,17 +23,18 @@ import org.apache.spark.storage.StorageLevel;
* Expose some commonly useful storage level constants.
*/
public class StorageLevels {
- public static final StorageLevel NONE = create(false, false, false, 1);
- public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
- public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
- public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
- public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
- public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
- public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
- public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
- public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
- public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
- public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);
+ public static final StorageLevel NONE = create(false, false, false, false, 1);
+ public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
+ public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
+ public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
+ public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
+ public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
+ public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2);
+ public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1);
+ public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2);
+ public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1);
+ public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
+ public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);
/**
* Create a new StorageLevel object.
@@ -42,7 +43,26 @@ public class StorageLevels {
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
- public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
- return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
+ @Deprecated
+ public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
+ int replication) {
+ return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
+ }
+
+ /**
+ * Create a new StorageLevel object.
+ * @param useDisk saved to disk, if true
+ * @param useMemory saved to memory, if true
+ * @param useOffHeap saved to Tachyon, if true
+ * @param deserialized saved as deserialized objects, if true
+ * @param replication replication factor
+ */
+ public static StorageLevel create(
+ boolean useDisk,
+ boolean useMemory,
+ boolean useOffHeap,
+ boolean deserialized,
+ int replication) {
+ return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication);
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 835cffe37a..fcf16ce1b2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -19,14 +19,13 @@ package org.apache.spark
import java.io._
import java.net.URI
-import java.util.{Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
-
+import java.util.{Properties, UUID}
+import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
@@ -130,6 +129,11 @@ class SparkContext(
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")
+ // Generate the random name for a temp folder in Tachyon
+ // Add a timestamp as the suffix here to make it more safe
+ val tachyonFolderName = "spark-" + randomUUID.toString()
+ conf.set("spark.tachyonStore.folderName", tachyonFolderName)
+
val isLocal = (master == "local" || master.startsWith("local["))
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 3486092a14..16887d8892 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -53,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
- executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
+ executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
+ false)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
@@ -105,7 +106,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
- Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
+ Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
+ sparkHostPort, cores),
name = "Executor")
workerUrl.foreach{ url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index 210f3dbeeb..ceff3a067d 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -41,6 +41,12 @@ object ExecutorExitCode {
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
+ /** TachyonStore failed to initialize after many attempts. */
+ val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
+
+ /** TachyonStore failed to create a local temporary directory after many attempts. */
+ val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
+
def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -48,6 +54,9 @@ object ExecutorExitCode {
case OOM => "OutOfMemoryError"
case DISK_STORE_FAILED_TO_CREATE_DIR =>
"Failed to create local directory (bad spark.local.dir?)"
+ case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
+ case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
+ "TachyonStore failed to create a local temporary directory."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 71584b6eb1..19138d9dde 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,22 +19,20 @@ package org.apache.spark.storage
import java.io.{File, InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
-
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Random
-
import akka.actor.{ActorSystem, Cancellable, Props}
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
import sun.nio.ch.DirectBuffer
-
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
import org.apache.spark.util._
+
sealed trait Values
case class ByteBufferValues(buffer: ByteBuffer) extends Values
@@ -59,6 +57,17 @@ private[spark] class BlockManager(
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
+ var tachyonInitialized = false
+ private[storage] lazy val tachyonStore: TachyonStore = {
+ val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
+ val appFolderName = conf.get("spark.tachyonStore.folderName")
+ val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
+ val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
+ val tachyonBlockManager = new TachyonBlockManager(
+ shuffleBlockManager, tachyonStorePath, tachyonMaster)
+ tachyonInitialized = true
+ new TachyonStore(this, tachyonBlockManager)
+ }
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
@@ -248,8 +257,10 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
+ val inTachyonSize = status.tachyonSize
val onDiskSize = status.diskSize
- master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
+ master.updateBlockInfo(
+ blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
} else true
}
@@ -259,22 +270,24 @@ private[spark] class BlockManager(
* and the updated in-memory and on-disk sizes.
*/
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
- val (newLevel, inMemSize, onDiskSize) = info.synchronized {
+ val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized {
info.level match {
case null =>
- (StorageLevel.NONE, 0L, 0L)
+ (StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
+ val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
- val replication = if (inMem || onDisk) level.replication else 1
- val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication)
+ val replication = if (inMem || inTachyon || onDisk) level.replication else 1
+ val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
+ val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
- (storageLevel, memSize, diskSize)
+ (storageLevel, memSize, diskSize, tachyonSize)
}
}
- BlockStatus(newLevel, inMemSize, onDiskSize)
+ BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize)
}
/**
@@ -354,6 +367,24 @@ private[spark] class BlockManager(
logDebug("Block " + blockId + " not found in memory")
}
}
+
+ // Look for the block in Tachyon
+ if (level.useOffHeap) {
+ logDebug("Getting block " + blockId + " from tachyon")
+ if (tachyonStore.contains(blockId)) {
+ tachyonStore.getBytes(blockId) match {
+ case Some(bytes) => {
+ if (!asValues) {
+ return Some(bytes)
+ } else {
+ return Some(dataDeserialize(blockId, bytes))
+ }
+ }
+ case None =>
+ logDebug("Block " + blockId + " not found in tachyon")
+ }
+ }
+ }
// Look for block on disk, potentially storing it back into memory if required:
if (level.useDisk) {
@@ -620,6 +651,23 @@ private[spark] class BlockManager(
}
// Keep track of which blocks are dropped from memory
res.droppedBlocks.foreach { block => updatedBlocks += block }
+ } else if (level.useOffHeap) {
+ // Save to Tachyon.
+ val res = data match {
+ case IteratorValues(iterator) =>
+ tachyonStore.putValues(blockId, iterator, level, false)
+ case ArrayBufferValues(array) =>
+ tachyonStore.putValues(blockId, array, level, false)
+ case ByteBufferValues(bytes) => {
+ bytes.rewind();
+ tachyonStore.putBytes(blockId, bytes, level)
+ }
+ }
+ size = res.size
+ res.data match {
+ case Right(newBytes) => bytesAfterPut = newBytes
+ case _ =>
+ }
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
@@ -644,8 +692,8 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
- // Now that the block is in either the memory or disk store, let other threads read it,
- // and tell the master about it.
+ // Now that the block is in either the memory, tachyon, or disk store,
+ // let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
@@ -707,7 +755,8 @@ private[spark] class BlockManager(
*/
var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
- val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
+ val tLevel = StorageLevel(
+ level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
@@ -832,9 +881,10 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
- if (!removedFromMemory && !removedFromDisk) {
+ val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
+ if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
- "the disk or memory store")
+ "the disk, memory, or tachyon store")
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
@@ -871,6 +921,9 @@ private[spark] class BlockManager(
if (level.useDisk) {
diskStore.remove(id)
}
+ if (level.useOffHeap) {
+ tachyonStore.remove(id)
+ }
iterator.remove()
logInfo("Dropped block " + id)
}
@@ -946,6 +999,9 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
+ if (tachyonInitialized) {
+ tachyonStore.clear()
+ }
metadataCleaner.cancel()
broadcastCleaner.cancel()
logInfo("BlockManager stopped")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index ed6937851b..4bc1b407ad 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -63,9 +63,10 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long): Boolean = {
+ diskSize: Long,
+ tachyonSize: Long): Boolean = {
val res = askDriverWithReply[Boolean](
- UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
logInfo("Updated info of block " + blockId)
res
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index ff2652b640..378f4cadc1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -73,10 +73,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
register(blockManagerId, maxMemSize, slaveActor)
sender ! true
- case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ case UpdateBlockInfo(
+ blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
// TODO: Ideally we want to handle all the message replies in receive instead of in the
// individual private methods.
- updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
+ updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize)
case GetLocations(blockId) =>
sender ! getLocations(blockId)
@@ -246,7 +247,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long) {
+ diskSize: Long,
+ tachyonSize: Long) {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.executorId == "<driver>" && !isLocal) {
@@ -265,7 +267,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
return
}
- blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
+ blockManagerInfo(blockManagerId).updateBlockInfo(
+ blockId, storageLevel, memSize, diskSize, tachyonSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
@@ -309,8 +312,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
}
-
-private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+private[spark] case class BlockStatus(
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long,
+ tachyonSize: Long)
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
@@ -336,7 +342,8 @@ private[spark] class BlockManagerInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long) {
+ diskSize: Long,
+ tachyonSize: Long) {
updateLastSeenMs()
@@ -350,23 +357,29 @@ private[spark] class BlockManagerInfo(
}
if (storageLevel.isValid) {
- /* isValid means it is either stored in-memory or on-disk.
+ /* isValid means it is either stored in-memory, on-disk or on-Tachyon.
* But the memSize here indicates the data size in or dropped from memory,
+ * tachyonSize here indicates the data size in or dropped from Tachyon,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
if (storageLevel.useMemory) {
- _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0))
+ _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0, 0))
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
Utils.bytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
- _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize))
+ _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize, 0))
logInfo("Added %s on disk on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
+ if (storageLevel.useOffHeap) {
+ _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
+ logInfo("Added %s on tachyon on %s (size: %s)".format(
+ blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
+ }
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val blockStatus: BlockStatus = _blocks.get(blockId)
@@ -381,6 +394,10 @@ private[spark] class BlockManagerInfo(
logInfo("Removed %s on %s on disk (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
+ if (blockStatus.storageLevel.useOffHeap) {
+ logInfo("Removed %s on %s on tachyon (size: %s)".format(
+ blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index bbb9529b5a..8a36b5cc42 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -53,11 +53,12 @@ private[storage] object BlockManagerMessages {
var blockId: BlockId,
var storageLevel: StorageLevel,
var memSize: Long,
- var diskSize: Long)
+ var diskSize: Long,
+ var tachyonSize: Long)
extends ToBlockManagerMaster
with Externalizable {
- def this() = this(null, null, null, 0, 0) // For deserialization only
+ def this() = this(null, null, null, 0, 0, 0) // For deserialization only
override def writeExternal(out: ObjectOutput) {
blockManagerId.writeExternal(out)
@@ -65,6 +66,7 @@ private[storage] object BlockManagerMessages {
storageLevel.writeExternal(out)
out.writeLong(memSize)
out.writeLong(diskSize)
+ out.writeLong(tachyonSize)
}
override def readExternal(in: ObjectInput) {
@@ -73,6 +75,7 @@ private[storage] object BlockManagerMessages {
storageLevel = StorageLevel(in)
memSize = in.readLong()
diskSize = in.readLong()
+ tachyonSize = in.readLong()
}
}
@@ -81,13 +84,15 @@ private[storage] object BlockManagerMessages {
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long): UpdateBlockInfo = {
- new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ diskSize: Long,
+ tachyonSize: Long): UpdateBlockInfo = {
+ new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)
}
// For pattern-matching
- def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, BlockId, StorageLevel, Long, Long)] = {
- Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
+ def unapply(h: UpdateBlockInfo)
+ : Option[(BlockManagerId, BlockId, StorageLevel, Long, Long, Long)] = {
+ Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize, h.tachyonSize))
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 4212a539da..95e71de2d3 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -21,8 +21,9 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
/**
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
- * whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
- * in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
+ * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to
+ * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on
+ * multiple nodes.
* The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
* factory method of the singleton object (`StorageLevel(...)`).
@@ -30,45 +31,58 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
class StorageLevel private(
private var useDisk_ : Boolean,
private var useMemory_ : Boolean,
+ private var useOffHeap_ : Boolean,
private var deserialized_ : Boolean,
private var replication_ : Int = 1)
extends Externalizable {
// TODO: Also add fields for caching priority, dataset ID, and flushing.
private def this(flags: Int, replication: Int) {
- this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
+ this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
}
- def this() = this(false, true, false) // For deserialization
+ def this() = this(false, true, false, false) // For deserialization
def useDisk = useDisk_
def useMemory = useMemory_
+ def useOffHeap = useOffHeap_
def deserialized = deserialized_
def replication = replication_
assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
+ if (useOffHeap) {
+ require(useDisk == false, "Off-heap storage level does not support using disk")
+ require(useMemory == false, "Off-heap storage level does not support using heap memory")
+ require(deserialized == false, "Off-heap storage level does not support deserialized storage")
+ require(replication == 1, "Off-heap storage level does not support multiple replication")
+ }
+
override def clone(): StorageLevel = new StorageLevel(
- this.useDisk, this.useMemory, this.deserialized, this.replication)
+ this.useDisk, this.useMemory, this.useOffHeap, this.deserialized, this.replication)
override def equals(other: Any): Boolean = other match {
case s: StorageLevel =>
s.useDisk == useDisk &&
s.useMemory == useMemory &&
+ s.useOffHeap == useOffHeap &&
s.deserialized == deserialized &&
s.replication == replication
case _ =>
false
}
- def isValid = ((useMemory || useDisk) && (replication > 0))
+ def isValid = ((useMemory || useDisk || useOffHeap) && (replication > 0))
def toInt: Int = {
var ret = 0
if (useDisk_) {
- ret |= 4
+ ret |= 8
}
if (useMemory_) {
+ ret |= 4
+ }
+ if (useOffHeap_) {
ret |= 2
}
if (deserialized_) {
@@ -84,8 +98,9 @@ class StorageLevel private(
override def readExternal(in: ObjectInput) {
val flags = in.readByte()
- useDisk_ = (flags & 4) != 0
- useMemory_ = (flags & 2) != 0
+ useDisk_ = (flags & 8) != 0
+ useMemory_ = (flags & 4) != 0
+ useOffHeap_ = (flags & 2) != 0
deserialized_ = (flags & 1) != 0
replication_ = in.readByte()
}
@@ -93,14 +108,15 @@ class StorageLevel private(
@throws(classOf[IOException])
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
- override def toString: String =
- "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
+ override def toString: String = "StorageLevel(%b, %b, %b, %b, %d)".format(
+ useDisk, useMemory, useOffHeap, deserialized, replication)
override def hashCode(): Int = toInt * 41 + replication
def description : String = {
var result = ""
result += (if (useDisk) "Disk " else "")
result += (if (useMemory) "Memory " else "")
+ result += (if (useOffHeap) "Tachyon " else "")
result += (if (deserialized) "Deserialized " else "Serialized ")
result += "%sx Replicated".format(replication)
result
@@ -113,22 +129,28 @@ class StorageLevel private(
* new storage levels.
*/
object StorageLevel {
- val NONE = new StorageLevel(false, false, false)
- val DISK_ONLY = new StorageLevel(true, false, false)
- val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
- val MEMORY_ONLY = new StorageLevel(false, true, true)
- val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
- val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
- val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
- val MEMORY_AND_DISK = new StorageLevel(true, true, true)
- val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
- val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
- val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
+ val NONE = new StorageLevel(false, false, false, false)
+ val DISK_ONLY = new StorageLevel(true, false, false, false)
+ val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
+ val MEMORY_ONLY = new StorageLevel(false, true, false, true)
+ val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
+ val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
+ val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
+ val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
+ val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
+ val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
+ val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
+ val OFF_HEAP = new StorageLevel(false, false, true, false)
+
+ /** Create a new StorageLevel object without setting useOffHeap */
+ def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
+ deserialized: Boolean, replication: Int) = getCachedStorageLevel(
+ new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
/** Create a new StorageLevel object */
- def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean,
- replication: Int = 1): StorageLevel =
- getCachedStorageLevel(new StorageLevel(useDisk, useMemory, deserialized, replication))
+ def apply(useDisk: Boolean, useMemory: Boolean,
+ deserialized: Boolean, replication: Int = 1) = getCachedStorageLevel(
+ new StorageLevel(useDisk, useMemory, false, deserialized, replication))
/** Create a new StorageLevel object from its integer representation */
def apply(flags: Int, replication: Int): StorageLevel =
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 26565f56ad..7a17495903 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -44,7 +44,7 @@ private[spark] class StorageStatusListener extends SparkListener {
storageStatusList.foreach { storageStatus =>
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
unpersistedBlocksIds.foreach { blockId =>
- storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L)
+ storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 6153dfe0b7..ff6e84cf98 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -48,17 +48,23 @@ class StorageStatus(
}
private[spark]
-class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel)
- extends Ordered[RDDInfo] {
+class RDDInfo(
+ val id: Int,
+ val name: String,
+ val numPartitions: Int,
+ val storageLevel: StorageLevel) extends Ordered[RDDInfo] {
var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
+ var tachyonSize= 0L
override def toString = {
- ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
- "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions,
- numPartitions, Utils.bytesToString(memSize), Utils.bytesToString(diskSize))
+ import Utils.bytesToString
+ ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" +
+ "TachyonSize: %s; DiskSize: %s").format(
+ name, id, storageLevel.toString, numCachedPartitions, numPartitions,
+ bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo) = {
@@ -105,14 +111,17 @@ object StorageUtils {
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
- // Add up memory and disk sizes
- val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize > 0 }
+ // Add up memory, disk and Tachyon sizes
+ val persistedBlocks =
+ blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
+ val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
rddInfoMap.get(rddId).map { rddInfo =>
rddInfo.numCachedPartitions = persistedBlocks.length
rddInfo.memSize = memSize
rddInfo.diskSize = diskSize
+ rddInfo.tachyonSize = tachyonSize
rddInfo
}
}.toArray
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
new file mode 100644
index 0000000000..b0b9674856
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Random}
+
+import tachyon.client.TachyonFS
+import tachyon.client.TachyonFile
+
+import org.apache.spark.Logging
+import org.apache.spark.executor.ExecutorExitCode
+import org.apache.spark.network.netty.ShuffleSender
+import org.apache.spark.util.Utils
+
+
+/**
+ * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By
+ * default, one block is mapped to one file with a name given by its BlockId.
+ *
+ * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
+ */
+private[spark] class TachyonBlockManager(
+ shuffleManager: ShuffleBlockManager,
+ rootDirs: String,
+ val master: String)
+ extends Logging {
+
+ val client = if (master != null && master != "") TachyonFS.get(master) else null
+
+ if (client == null) {
+ logError("Failed to connect to the Tachyon as the master address is not configured")
+ System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_INITIALIZE)
+ }
+
+ private val MAX_DIR_CREATION_ATTEMPTS = 10
+ private val subDirsPerTachyonDir =
+ shuffleManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt
+
+ // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
+ // then, inside this directory, create multiple subdirectories that we will hash files into,
+ // in order to avoid having really large inodes at the top level in Tachyon.
+ private val tachyonDirs: Array[TachyonFile] = createTachyonDirs()
+ private val subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
+
+ addShutdownHook()
+
+ def removeFile(file: TachyonFile): Boolean = {
+ client.delete(file.getPath(), false)
+ }
+
+ def fileExists(file: TachyonFile): Boolean = {
+ client.exist(file.getPath())
+ }
+
+ def getFile(filename: String): TachyonFile = {
+ // Figure out which tachyon directory it hashes to, and which subdirectory in that
+ val hash = Utils.nonNegativeHash(filename)
+ val dirId = hash % tachyonDirs.length
+ val subDirId = (hash / tachyonDirs.length) % subDirsPerTachyonDir
+
+ // Create the subdirectory if it doesn't already exist
+ var subDir = subDirs(dirId)(subDirId)
+ if (subDir == null) {
+ subDir = subDirs(dirId).synchronized {
+ val old = subDirs(dirId)(subDirId)
+ if (old != null) {
+ old
+ } else {
+ val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId)
+ client.mkdir(path)
+ val newDir = client.getFile(path)
+ subDirs(dirId)(subDirId) = newDir
+ newDir
+ }
+ }
+ }
+ val filePath = subDir + "/" + filename
+ if(!client.exist(filePath)) {
+ client.createFile(filePath)
+ }
+ val file = client.getFile(filePath)
+ file
+ }
+
+ def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name)
+
+ // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
+ private def createTachyonDirs(): Array[TachyonFile] = {
+ logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
+ val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
+ rootDirs.split(",").map { rootDir =>
+ var foundLocalDir = false
+ var tachyonDir: TachyonFile = null
+ var tachyonDirId: String = null
+ var tries = 0
+ val rand = new Random()
+ while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
+ tries += 1
+ try {
+ tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
+ val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId
+ if (!client.exist(path)) {
+ foundLocalDir = client.mkdir(path)
+ tachyonDir = client.getFile(path)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
+ }
+ }
+ if (!foundLocalDir) {
+ logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
+ rootDir)
+ System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
+ }
+ logInfo("Created tachyon directory at " + tachyonDir)
+ tachyonDir
+ }
+ }
+
+ private def addShutdownHook() {
+ tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
+ Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
+ override def run() {
+ logDebug("Shutdown hook called")
+ tachyonDirs.foreach { tachyonDir =>
+ try {
+ if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+ Utils.deleteRecursively(tachyonDir, client)
+ }
+ } catch {
+ case t: Throwable =>
+ logError("Exception while deleting tachyon spark dir: " + tachyonDir, t)
+ }
+ }
+ }
+ })
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
new file mode 100644
index 0000000000..b86abbda1d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import tachyon.client.TachyonFile
+
+/**
+ * References a particular segment of a file (potentially the entire file), based off an offset and
+ * a length.
+ */
+private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) {
+ override def toString = "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length)
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
new file mode 100644
index 0000000000..c37e76f893
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.IOException
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import tachyon.client.{WriteType, ReadType}
+
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import org.apache.spark.serializer.Serializer
+
+
+private class Entry(val size: Long)
+
+
+/**
+ * Stores BlockManager blocks on Tachyon.
+ */
+private class TachyonStore(
+ blockManager: BlockManager,
+ tachyonManager: TachyonBlockManager)
+ extends BlockStore(blockManager: BlockManager) with Logging {
+
+ logInfo("TachyonStore started")
+
+ override def getSize(blockId: BlockId): Long = {
+ tachyonManager.getFile(blockId.name).length
+ }
+
+ override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
+ putToTachyonStore(blockId, bytes, true)
+ }
+
+ override def putValues(
+ blockId: BlockId,
+ values: ArrayBuffer[Any],
+ level: StorageLevel,
+ returnValues: Boolean): PutResult = {
+ return putValues(blockId, values.toIterator, level, returnValues)
+ }
+
+ override def putValues(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel,
+ returnValues: Boolean): PutResult = {
+ logDebug("Attempting to write values for block " + blockId)
+ val _bytes = blockManager.dataSerialize(blockId, values)
+ putToTachyonStore(blockId, _bytes, returnValues)
+ }
+
+ private def putToTachyonStore(
+ blockId: BlockId,
+ bytes: ByteBuffer,
+ returnValues: Boolean): PutResult = {
+ // So that we do not modify the input offsets !
+ // duplicate does not copy buffer, so inexpensive
+ val byteBuffer = bytes.duplicate()
+ byteBuffer.rewind()
+ logDebug("Attempting to put block " + blockId + " into Tachyon")
+ val startTime = System.currentTimeMillis
+ val file = tachyonManager.getFile(blockId)
+ val os = file.getOutStream(WriteType.TRY_CACHE)
+ os.write(byteBuffer.array())
+ os.close()
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored as %s file in Tachyon in %d ms".format(
+ blockId, Utils.bytesToString(byteBuffer.limit), (finishTime - startTime)))
+
+ if (returnValues) {
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
+ } else {
+ PutResult(bytes.limit(), null)
+ }
+ }
+
+ override def remove(blockId: BlockId): Boolean = {
+ val file = tachyonManager.getFile(blockId)
+ if (tachyonManager.fileExists(file)) {
+ tachyonManager.removeFile(file)
+ } else {
+ false
+ }
+ }
+
+ override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+ getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+ }
+
+
+ override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+ val file = tachyonManager.getFile(blockId)
+ if (file == null || file.getLocationHosts().size == 0) {
+ return None
+ }
+ val is = file.getInStream(ReadType.CACHE)
+ var buffer: ByteBuffer = null
+ try {
+ if (is != null) {
+ val size = file.length
+ val bs = new Array[Byte](size.asInstanceOf[Int])
+ val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
+ buffer = ByteBuffer.wrap(bs)
+ if (fetchSize != size) {
+ logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size +
+ " is not equal to fetched size " + fetchSize)
+ return None
+ }
+ }
+ } catch {
+ case ioe: IOException => {
+ logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe)
+ return None
+ }
+ }
+ Some(buffer)
+ }
+
+ override def contains(blockId: BlockId): Boolean = {
+ val file = tachyonManager.getFile(blockId)
+ tachyonManager.fileExists(file)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
index b2732de510..0fa461e5e9 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
@@ -33,6 +33,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) {
private lazy val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
+
val rdds = listener.rddInfoList
val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage)
@@ -45,6 +46,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) {
"Cached Partitions",
"Fraction Cached",
"Size in Memory",
+ "Size in Tachyon",
"Size on Disk")
/** Render an HTML row representing an RDD */
@@ -60,6 +62,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) {
<td>{rdd.numCachedPartitions}</td>
<td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
<td>{Utils.bytesToString(rdd.memSize)}</td>
+ <td>{Utils.bytesToString(rdd.tachyonSize)}</td>
<td>{Utils.bytesToString(rdd.diskSize)}</td>
</tr>
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index d9a6af6187..2155a8888c 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -274,12 +274,14 @@ private[spark] object JsonProtocol {
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
+ ("Tachyon Size" -> rddInfo.tachyonSize) ~
("Disk Size" -> rddInfo.diskSize)
}
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
+ ("Use Tachyon" -> storageLevel.useOffHeap) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
@@ -288,6 +290,7 @@ private[spark] object JsonProtocol {
val storageLevel = storageLevelToJson(blockStatus.storageLevel)
("Storage Level" -> storageLevel) ~
("Memory Size" -> blockStatus.memSize) ~
+ ("Tachyon Size" -> blockStatus.tachyonSize) ~
("Disk Size" -> blockStatus.diskSize)
}
@@ -570,11 +573,13 @@ private[spark] object JsonProtocol {
val numPartitions = (json \ "Number of Partitions").extract[Int]
val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
val memSize = (json \ "Memory Size").extract[Long]
+ val tachyonSize = (json \ "Tachyon Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel)
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
+ rddInfo.tachyonSize = tachyonSize
rddInfo.diskSize = diskSize
rddInfo
}
@@ -582,16 +587,18 @@ private[spark] object JsonProtocol {
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
+ val useTachyon = (json \ "Use Tachyon").extract[Boolean]
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
- StorageLevel(useDisk, useMemory, deserialized, replication)
+ StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication)
}
def blockStatusFromJson(json: JValue): BlockStatus = {
val storageLevel = storageLevelFromJson(json \ "Storage Level")
val memorySize = (json \ "Memory Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
- BlockStatus(storageLevel, memorySize, diskSize)
+ val tachyonSize = (json \ "Tachyon Size").extract[Long]
+ BlockStatus(storageLevel, memorySize, diskSize, tachyonSize)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 737b765e2a..d3c39dee33 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -34,11 +34,13 @@ import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.json4s._
+import tachyon.client.{TachyonFile,TachyonFS}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
+
/**
* Various utility methods used by Spark.
*/
@@ -153,6 +155,7 @@ private[spark] object Utils extends Logging {
}
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
+ private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
@@ -162,6 +165,14 @@ private[spark] object Utils extends Logging {
}
}
+ // Register the tachyon path to be deleted via shutdown hook
+ def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
+ val absolutePath = tachyonfile.getPath()
+ shutdownDeleteTachyonPaths.synchronized {
+ shutdownDeleteTachyonPaths += absolutePath
+ }
+ }
+
// Is the path already registered to be deleted via a shutdown hook ?
def hasShutdownDeleteDir(file: File): Boolean = {
val absolutePath = file.getAbsolutePath()
@@ -170,6 +181,14 @@ private[spark] object Utils extends Logging {
}
}
+ // Is the path already registered to be deleted via a shutdown hook ?
+ def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
+ val absolutePath = file.getPath()
+ shutdownDeletePaths.synchronized {
+ shutdownDeletePaths.contains(absolutePath)
+ }
+ }
+
// Note: if file is child of some registered path, while not equal to it, then return true;
// else false. This is to ensure that two shutdown hooks do not try to delete each others
// paths - resulting in IOException and incomplete cleanup.
@@ -186,6 +205,22 @@ private[spark] object Utils extends Logging {
retval
}
+ // Note: if file is child of some registered path, while not equal to it, then return true;
+ // else false. This is to ensure that two shutdown hooks do not try to delete each others
+ // paths - resulting in Exception and incomplete cleanup.
+ def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
+ val absolutePath = file.getPath()
+ val retval = shutdownDeletePaths.synchronized {
+ shutdownDeletePaths.find { path =>
+ !absolutePath.equals(path) && absolutePath.startsWith(path)
+ }.isDefined
+ }
+ if (retval) {
+ logInfo("path = " + file + ", already present as root for deletion.")
+ }
+ retval
+ }
+
/** Create a temporary directory inside the given parent directory */
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
var attempts = 0
@@ -541,7 +576,16 @@ private[spark] object Utils extends Logging {
}
/**
- * Check to see if file is a symbolic link.
+ * Delete a file or directory and its contents recursively.
+ */
+ def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
+ if (!client.delete(dir.getPath(), true)) {
+ throw new IOException("Failed to delete the tachyon dir: " + dir)
+ }
+ }
+
+ /**
+ * Check to see if file is a symbolic link.
*/
def isSymlink(file: File): Boolean = {
if (file == null) throw new NullPointerException("File must not be null")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e83cd55e73..b6dd052610 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -96,9 +96,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("StorageLevel object caching") {
- val level1 = StorageLevel(false, false, false, 3)
- val level2 = StorageLevel(false, false, false, 3) // this should return the same object as level1
- val level3 = StorageLevel(false, false, false, 2) // this should return a different object
+ val level1 = StorageLevel(false, false, false, false, 3)
+ val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1
+ val level3 = StorageLevel(false, false, false, false, 2) // this should return a different object
assert(level2 === level1, "level2 is not same as level1")
assert(level2.eq(level1), "level2 is not the same object as level1")
assert(level3 != level1, "level3 is same as level1")
@@ -410,6 +410,25 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store")
}
+ test("tachyon storage") {
+ // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
+ val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
+ if (tachyonUnitTestEnabled) {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.OFF_HEAP)
+ store.putSingle("a2", a2, StorageLevel.OFF_HEAP)
+ store.putSingle("a3", a3, StorageLevel.OFF_HEAP)
+ assert(store.getSingle("a3").isDefined, "a3 was in store")
+ assert(store.getSingle("a2").isDefined, "a2 was in store")
+ assert(store.getSingle("a1").isDefined, "a1 was in store")
+ } else {
+ info("tachyon storage test disabled.")
+ }
+ }
+
test("on-disk storage") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr)
val a1 = new Array[Byte](400)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 40c29014c4..054eb01a64 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -456,7 +456,7 @@ class JsonProtocolSuite extends FunSuite {
t.shuffleWriteMetrics = Some(sw)
// Make at most 6 blocks
t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
- (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i))
+ (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i))
}.toSeq)
t
}
@@ -470,19 +470,19 @@ class JsonProtocolSuite extends FunSuite {
"""
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
"greetings","Number of Tasks":200,"RDD Info":{"RDD ID":100,"Name":"mayor","Storage
- Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1},
- "Number of Partitions":200,"Number of Cached Partitions":300,"Memory Size":400,
- "Disk Size":500},"Emitted Task Size Warning":false},"Properties":{"France":"Paris",
- "Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
+ Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
+ "Replication":1},"Number of Partitions":200,"Number of Cached Partitions":300,
+ "Memory Size":400,"Disk Size":500,"Tachyon Size":0},"Emitted Task Size Warning":false},
+ "Properties":{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
"""
private val stageCompletedJsonString =
"""
{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name":
"greetings","Number of Tasks":201,"RDD Info":{"RDD ID":101,"Name":"mayor","Storage
- Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1},
- "Number of Partitions":201,"Number of Cached Partitions":301,"Memory Size":401,
- "Disk Size":501},"Emitted Task Size Warning":false}}
+ Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
+ "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
+ "Memory Size":401,"Disk Size":501,"Tachyon Size":0},"Emitted Task Size Warning":false}}
"""
private val taskStartJsonString =
@@ -515,8 +515,8 @@ class JsonProtocolSuite extends FunSuite {
700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics":
{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks":
[{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status":
- {"Storage Level":{"Use Disk":true,"Use Memory":true,"Deserialized":false,
- "Replication":2},"Memory Size":0,"Disk Size":0}}]}}
+ {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
+ "Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}}
"""
private val jobStartJsonString =
diff --git a/docs/configuration.md b/docs/configuration.md
index 1ff0150567..b6005acac8 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -123,6 +123,21 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.tachyonStore.baseDir</td>
+ <td>System.getProperty("java.io.tmpdir")</td>
+ <td>
+ Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by <code>spark.tachyonStore.url</code>.
+ It can also be a comma-separated list of multiple directories on Tachyon file system.
+ </td>
+</tr>
+<tr>
+ <td>spark.tachyonStore.url</td>
+ <td>tachyon://localhost:19998</td>
+ <td>
+ The URL of the underlying Tachyon file system in the TachyonStore.
+ </td>
+</tr>
+<tr>
<td>spark.mesos.coarse</td>
<td>false</td>
<td>
@@ -161,13 +176,13 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.ui.acls.enable</td>
<td>false</td>
<td>
- Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has
+ Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has
access permissions to view the web ui. See <code>spark.ui.view.acls</code> for more details.
Also note this requires the user to be known, if the user comes across as null no checks
are done. Filters can be used to authenticate and set the user.
</td>
</tr>
-<tr>
+<tr>
<td>spark.ui.view.acls</td>
<td>Empty</td>
<td>
@@ -276,10 +291,10 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.serializer.objectStreamReset</td>
<td>10000</td>
<td>
- When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches
- objects to prevent writing redundant data, however that stops garbage collection of those
- objects. By calling 'reset' you flush that info from the serializer, and allow old
- objects to be collected. To turn off this periodic reset set it to a value of <= 0.
+ When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches
+ objects to prevent writing redundant data, however that stops garbage collection of those
+ objects. By calling 'reset' you flush that info from the serializer, and allow old
+ objects to be collected. To turn off this periodic reset set it to a value of <= 0.
By default it will reset the serializer every 10,000 objects.
</td>
</tr>
@@ -375,7 +390,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.akka.heartbeat.interval</td>
<td>1000</td>
<td>
- This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.
+ This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.
</td>
</tr>
<tr>
@@ -430,7 +445,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.broadcast.blockSize</td>
<td>4096</td>
<td>
- Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
+ Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
@@ -555,7 +570,7 @@ Apart from these, the following properties are also available, and may be useful
the driver.
</td>
</tr>
-<tr>
+<tr>
<td>spark.authenticate</td>
<td>false</td>
<td>
@@ -563,7 +578,7 @@ Apart from these, the following properties are also available, and may be useful
running on Yarn.
</td>
</tr>
-<tr>
+<tr>
<td>spark.authenticate.secret</td>
<td>None</td>
<td>
@@ -571,12 +586,12 @@ Apart from these, the following properties are also available, and may be useful
not running on Yarn and authentication is enabled.
</td>
</tr>
-<tr>
+<tr>
<td>spark.core.connection.auth.wait.timeout</td>
<td>30</td>
<td>
Number of seconds for the connection to wait for authentication to occur before timing
- out and giving up.
+ out and giving up.
</td>
</tr>
<tr>
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 99412733d4..77373890ee 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -23,7 +23,7 @@ To write a Spark application, you need to add a dependency on Spark. If you use
groupId = org.apache.spark
artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
- version = {{site.SPARK_VERSION}}
+ version = {{site.SPARK_VERSION}}
In addition, if you wish to access an HDFS cluster, you need to add a dependency on `hadoop-client` for your version of HDFS:
@@ -73,14 +73,14 @@ The master URL passed to Spark can be in one of the following formats:
<table class="table">
<tr><th>Master URL</th><th>Meaning</th></tr>
<tr><td> local </td><td> Run Spark locally with one worker thread (i.e. no parallelism at all). </td></tr>
-<tr><td> local[K] </td><td> Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
+<tr><td> local[K] </td><td> Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
</td></tr>
-<tr><td> spark://HOST:PORT </td><td> Connect to the given <a href="spark-standalone.html">Spark standalone
- cluster</a> master. The port must be whichever one your master is configured to use, which is 7077 by default.
+<tr><td> spark://HOST:PORT </td><td> Connect to the given <a href="spark-standalone.html">Spark standalone
+ cluster</a> master. The port must be whichever one your master is configured to use, which is 7077 by default.
</td></tr>
-<tr><td> mesos://HOST:PORT </td><td> Connect to the given <a href="running-on-mesos.html">Mesos</a> cluster.
- The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use,
- which is 5050 by default.
+<tr><td> mesos://HOST:PORT </td><td> Connect to the given <a href="running-on-mesos.html">Mesos</a> cluster.
+ The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use,
+ which is 5050 by default.
</td></tr>
</table>
@@ -265,11 +265,25 @@ A complete list of actions is available in the [RDD API doc](api/core/index.html
## RDD Persistence
-One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory across operations. When you persist an RDD, each node stores any slices of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for building iterative algorithms with Spark and for interactive use from the interpreter.
-
-You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant -- if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
-
-In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), or even replicate it across nodes. These levels are chosen by passing a [`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel) object to `persist()`. The `cache()` method is a shorthand for using the default storage level, which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of available storage levels is:
+One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory
+across operations. When you persist an RDD, each node stores any slices of it that it computes in
+memory and reuses them in other actions on that dataset (or datasets derived from it). This allows
+future actions to be much faster (often by more than 10x). Caching is a key tool for building
+iterative algorithms with Spark and for interactive use from the interpreter.
+
+You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time
+it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant --
+if any partition of an RDD is lost, it will automatically be recomputed using the transformations
+that originally created it.
+
+In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to
+persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space),
+or replicate it across nodes, or store the data in off-heap memory in [Tachyon](http://tachyon-project.org/).
+These levels are chosen by passing a
+[`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel)
+object to `persist()`. The `cache()` method is a shorthand for using the default storage level,
+which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of
+available storage levels is:
<table class="table">
<tr><th style="width:23%">Storage Level</th><th>Meaning</th></tr>
@@ -292,8 +306,16 @@ In addition, each RDD can be stored using a different *storage level*, allowing
</tr>
<tr>
<td> MEMORY_AND_DISK_SER </td>
- <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them
- on the fly each time they're needed. </td>
+ <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of
+ recomputing them on the fly each time they're needed. </td>
+</tr>
+<tr>
+ <td> OFF_HEAP </td>
+ <td> Store RDD in a <i>serialized</i> format in Tachyon.
+ This is generally more space-efficient than deserialized objects, especially when using a
+ <a href="tuning.html">fast serializer</a>, but more CPU-intensive to read.
+ This also significantly reduces the overheads of GC.
+ </td>
</tr>
<tr>
<td> DISK_ONLY </td>
@@ -307,30 +329,59 @@ In addition, each RDD can be stored using a different *storage level*, allowing
### Which Storage Level to Choose?
-Spark's storage levels are meant to provide different tradeoffs between memory usage and CPU efficiency.
-We recommend going through the following process to select one:
-
-* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way. This is the most
- CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
-* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to make the objects
- much more space-efficient, but still reasonably fast to access.
-* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter a large
- amount of the data. Otherwise, recomputing a partition is about as fast as reading it from disk.
-* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web
- application). *All* the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones
- let you continue running tasks on the RDD without waiting to recompute a lost partition.
-
-If you want to define your own storage level (say, with replication factor of 3 instead of 2), then use the function factor method `apply()` of the [`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object.
+Spark's storage levels are meant to provide different trade-offs between memory usage and CPU
+efficiency. It allows uses to choose memory, disk, or Tachyon for storing data. We recommend going
+through the following process to select one:
+
+* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way.
+ This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
+
+* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to
+make the objects much more space-efficient, but still reasonably fast to access. You can also use
+`OFF_HEAP` mode to store the data off the heap in [Tachyon](http://tachyon-project.org/). This will
+significantly reduce JVM GC overhead.
+
+* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter
+a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from
+disk.
+
+* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve
+requests from a web application). *All* the storage levels provide full fault tolerance by
+recomputing lost data, but the replicated ones let you continue running tasks on the RDD without
+waiting to recompute a lost partition.
+
+If you want to define your own storage level (say, with replication factor of 3 instead of 2), then
+use the function factor method `apply()` of the
+[`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object.
+
+Spark has a block manager inside the Executors that let you chose memory, disk, or off-heap. The
+latter is for storing RDDs off-heap outside the Executor JVM on top of the memory management system
+[Tachyon](http://tachyon-project.org/). This mode has the following advantages:
+
+* Cached data will not be lost if individual executors crash.
+* Executors can have a smaller memory footprint, allowing you to run more executors on the same
+machine as the bulk of the memory will be inside Tachyon.
+* Reduced GC overhead since data is stored in Tachyon.
# Shared Variables
-Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of *shared variables* for two common usage patterns: broadcast variables and accumulators.
+Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a
+remote cluster node, it works on separate copies of all the variables used in the function. These
+variables are copied to each machine, and no updates to the variables on the remote machine are
+propagated back to the driver program. Supporting general, read-write shared variables across tasks
+would be inefficient. However, Spark does provide two limited types of *shared variables* for two
+common usage patterns: broadcast variables and accumulators.
## Broadcast Variables
-Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
+Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather
+than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a
+large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables
+using efficient broadcast algorithms to reduce communication cost.
-Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` method. The interpreter session below shows this:
+Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The
+broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value`
+method. The interpreter session below shows this:
{% highlight scala %}
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
@@ -340,13 +391,21 @@ scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
{% endhighlight %}
-After the broadcast variable is created, it should be used instead of the value `v` in any functions run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object `v` should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
+After the broadcast variable is created, it should be used instead of the value `v` in any functions
+run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object
+`v` should not be modified after it is broadcast in order to ensure that all nodes get the same
+value of the broadcast variable (e.g. if the variable is shipped to a new node later).
## Accumulators
-Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can add support for new types.
+Accumulators are variables that are only "added" to through an associative operation and can
+therefore be efficiently supported in parallel. They can be used to implement counters (as in
+MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable
+collections, and programmers can add support for new types.
-An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks running on the cluster can then add to it using the `+=` operator. However, they cannot read its value. Only the driver program can read the accumulator's value, using its `value` method.
+An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
+running on the cluster can then add to it using the `+=` operator. However, they cannot read its
+value. Only the driver program can read the accumulator's value, using its `value` method.
The interpreter session below shows an accumulator being used to add up the elements of an array:
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
index e5a09ecec0..d3babc3ed1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
@@ -18,8 +18,8 @@
package org.apache.spark.examples
import scala.math.random
+
import org.apache.spark._
-import SparkContext._
/** Computes an approximation to pi */
object SparkPi {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
new file mode 100644
index 0000000000..53b303d658
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import java.util.Random
+import scala.math.exp
+import org.apache.spark.util.Vector
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.InputFormatInfo
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Logistic regression based classification.
+ * This example uses Tachyon to persist rdds during computation.
+ */
+object SparkTachyonHdfsLR {
+ val D = 10 // Numer of dimensions
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def parsePoint(line: String): DataPoint = {
+ val tok = new java.util.StringTokenizer(line, " ")
+ var y = tok.nextToken.toDouble
+ var x = new Array[Double](D)
+ var i = 0
+ while (i < D) {
+ x(i) = tok.nextToken.toDouble; i += 1
+ }
+ DataPoint(new Vector(x), y)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: SparkTachyonHdfsLR <master> <file> <iters>")
+ System.exit(1)
+ }
+ val inputPath = args(1)
+ val conf = SparkHadoopUtil.get.newConfiguration()
+ val sc = new SparkContext(args(0), "SparkTachyonHdfsLR",
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
+ InputFormatInfo.computePreferredLocations(
+ Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
+ ))
+ val lines = sc.textFile(inputPath)
+ val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP)
+ val ITERATIONS = args(2).toInt
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ val gradient = points.map { p =>
+ (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
+ }.reduce(_ + _)
+ w -= gradient
+ }
+
+ println("Final w: " + w)
+ System.exit(0)
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
new file mode 100644
index 0000000000..ce78f0876e
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import scala.math.random
+
+import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Computes an approximation to pi
+ * This example uses Tachyon to persist rdds during computation.
+ */
+object SparkTachyonPi {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkTachyonPi <master> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "SparkTachyonPi",
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
+
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val n = 100000 * slices
+
+ val rdd = spark.parallelize(1 to n, slices)
+ rdd.persist(StorageLevel.OFF_HEAP)
+ val count = rdd.map { i =>
+ val x = random * 2 - 1
+ val y = random * 2 - 1
+ if (x * x + y * y < 1) 1 else 0
+ }.reduce(_ + _)
+ println("Pi is roughly " + 4.0 * count / n)
+
+ spark.stop()
+ }
+}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c5c697e8e2..843a874fbf 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -30,7 +30,7 @@ import scala.collection.JavaConversions._
// import com.jsuereth.pgp.sbtplugin.PgpKeys._
object SparkBuild extends Build {
- val SPARK_VERSION = "1.0.0-SNAPSHOT"
+ val SPARK_VERSION = "1.0.0-SNAPSHOT"
// Hadoop version to build against. For example, "1.0.4" for Apache releases, or
// "2.0.0-mr1-cdh4.2.0" for Cloudera Hadoop. Note that these variables can be set
@@ -185,15 +185,14 @@ object SparkBuild extends Build {
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
resolvers ++= Seq(
- // HTTPS is unavailable for Maven Central
"Maven Repository" at "http://repo.maven.apache.org/maven2",
"Apache Repository" at "https://repository.apache.org/content/repositories/releases",
"JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/",
"MQTT Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/",
- "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
+ "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/",
// For Sonatype publishing
- //"sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
- //"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/",
+ // "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
+ // "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/",
// also check the local Maven repository ~/.m2
Resolver.mavenLocal
),
@@ -280,13 +279,18 @@ object SparkBuild extends Build {
val slf4jVersion = "1.7.5"
val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
+ val excludeEclipseJetty = ExclusionRule(organization = "org.eclipse.jetty")
val excludeAsm = ExclusionRule(organization = "org.ow2.asm")
val excludeOldAsm = ExclusionRule(organization = "asm")
val excludeCommonsLogging = ExclusionRule(organization = "commons-logging")
val excludeSLF4J = ExclusionRule(organization = "org.slf4j")
val excludeScalap = ExclusionRule(organization = "org.scala-lang", artifact = "scalap")
+ val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
+ val excludeCurator = ExclusionRule(organization = "org.apache.curator")
+ val excludePowermock = ExclusionRule(organization = "org.powermock")
- def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark",
+
+ def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark",
version: String = "0.9.0-incubating", crossVersion: String = "2.10"): Option[sbt.ModuleID] = {
val fullId = if (crossVersion.isEmpty) id else id + "_" + crossVersion
Some(organization % fullId % version) // the artifact to compare binary compatibility with
@@ -323,6 +327,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm),
"com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm),
+ "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
"com.clearspring.analytics" % "stream" % "2.5.1"
),
libraryDependencies ++= maybeAvro
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index ff1023bbfa..d8667e84fe 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -423,8 +423,11 @@ class SparkContext(object):
raise Exception("storageLevel must be of type pyspark.StorageLevel")
newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
- return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory,
- storageLevel.deserialized, storageLevel.replication)
+ return newStorageLevel(storageLevel.useDisk,
+ storageLevel.useMemory,
+ storageLevel.useOffHeap,
+ storageLevel.deserialized,
+ storageLevel.replication)
def setJobGroup(self, groupId, description):
"""
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 9943296b92..fb27863e07 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1302,11 +1302,12 @@ class RDD(object):
Get the RDD's current storage level.
>>> rdd1 = sc.parallelize([1,2])
>>> rdd1.getStorageLevel()
- StorageLevel(False, False, False, 1)
+ StorageLevel(False, False, False, False, 1)
"""
java_storage_level = self._jrdd.getStorageLevel()
storage_level = StorageLevel(java_storage_level.useDisk(),
java_storage_level.useMemory(),
+ java_storage_level.useOffHeap(),
java_storage_level.deserialized(),
java_storage_level.replication())
return storage_level
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index c3e3a44e8e..7b6660eab2 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -25,23 +25,25 @@ class StorageLevel:
Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY.
"""
- def __init__(self, useDisk, useMemory, deserialized, replication = 1):
+ def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication = 1):
self.useDisk = useDisk
self.useMemory = useMemory
+ self.useOffHeap = useOffHeap
self.deserialized = deserialized
self.replication = replication
def __repr__(self):
- return "StorageLevel(%s, %s, %s, %s)" % (
- self.useDisk, self.useMemory, self.deserialized, self.replication)
+ return "StorageLevel(%s, %s, %s, %s, %s)" % (
+ self.useDisk, self.useMemory, self.useOffHeap, self.deserialized, self.replication)
-StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
-StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
-StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)
-StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, True, 2)
-StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False)
-StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, 2)
-StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, True)
-StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, True, 2)
-StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False)
-StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, 2)
+StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
+StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
+StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, True)
+StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, True, 2)
+StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False, False)
+StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
+StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, True)
+StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2)
+StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False)
+StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
+StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1) \ No newline at end of file