aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-01-15 12:03:28 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-01-15 12:03:28 -0800
commitad1503f92e1f6e960a24f9f5d36b1735d1f5073a (patch)
tree564c1b3b8bea707d98d9467a7bbba87845051908
parent5f83c6991c95616ecbc2878f8860c69b2826f56c (diff)
downloadspark-ad1503f92e1f6e960a24f9f5d36b1735d1f5073a.tar.gz
spark-ad1503f92e1f6e960a24f9f5d36b1735d1f5073a.tar.bz2
spark-ad1503f92e1f6e960a24f9f5d36b1735d1f5073a.zip
[SPARK-12667] Remove block manager's internal "external block store" API
This pull request removes the external block store API. This is rarely used, and the file system interface is actually a better, more standard way to interact with external storage systems. There are some other things to remove also, as pointed out by JoshRosen. We will do those as follow-up pull requests. Author: Reynold Xin <rxin@databricks.com> Closes #10752 from rxin/remove-offheap.
-rw-r--r--core/pom.xml27
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala122
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala211
-rw-r--r--core/src/main/scala/org/apache/spark/storage/RDDInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala324
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageSuite.scala97
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala60
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala32
-rw-r--r--dev/deps/spark-deps-hadoop-2.26
-rw-r--r--dev/deps/spark-deps-hadoop-2.34
-rw-r--r--dev/deps/spark-deps-hadoop-2.44
-rw-r--r--dev/deps/spark-deps-hadoop-2.64
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala93
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala50
-rw-r--r--project/MimaExcludes.scala6
34 files changed, 139 insertions, 1212 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 3bec5debc2..2071a58de9 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -268,33 +268,6 @@
<version>${oro.version}</version>
</dependency>
<dependency>
- <groupId>org.tachyonproject</groupId>
- <artifactId>tachyon-client</artifactId>
- <version>0.8.2</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.tachyonproject</groupId>
- <artifactId>tachyon-underfs-glusterfs</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-java</artifactId>
<exclusions>
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 98075cef11..77acb7052d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -243,10 +243,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
- // Generate the random name for a temp folder in external block store.
- // Add a timestamp as the suffix here to make it more safe
- val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
-
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
/**
@@ -423,8 +419,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
- _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
-
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
index c115e0ff74..dad90fc220 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
-import org.apache.spark.{Logging, SparkEnv, SparkException, TaskContext}
+import org.apache.spark.{Logging, SparkEnv, TaskContext}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.Utils
@@ -72,12 +72,6 @@ private[spark] object LocalRDDCheckpointData {
* This method is idempotent.
*/
def transformStorageLevel(level: StorageLevel): StorageLevel = {
- // If this RDD is to be cached off-heap, fail fast since we cannot provide any
- // correctness guarantees about subsequent computations after the first one
- if (level.useOffHeap) {
- throw new SparkException("Local checkpointing is not compatible with off-heap caching.")
- }
-
StorageLevel(useDisk = true, level.useMemory, level.deserialized, level.replication)
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 9cd52d6c2b..fe372116f1 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -85,8 +85,6 @@ class JobData private[spark](
val numSkippedStages: Int,
val numFailedStages: Int)
-// Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage
-// page ... does anybody pay attention to it?
class RDDStorageInfo private[spark](
val id: Int,
val name: String,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4479e6875a..e49d79b8ad 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -83,13 +83,8 @@ private[spark] class BlockManager(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
// Actual storage of where blocks are kept
- private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, memoryManager)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
- private[spark] lazy val externalBlockStore: ExternalBlockStore = {
- externalBlockStoreInitialized = true
- new ExternalBlockStore(this, executorId)
- }
memoryManager.setMemoryStore(memoryStore)
// Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
@@ -313,8 +308,7 @@ private[spark] class BlockManager(
blockInfo.asScala.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
- // Assume that block is not in external block store
- BlockStatus(info.level, memSize, diskSize, 0L)
+ BlockStatus(info.level, memSize = memSize, diskSize = diskSize)
}
}
@@ -363,10 +357,8 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
- val inExternalBlockStoreSize = status.externalBlockStoreSize
val onDiskSize = status.diskSize
- master.updateBlockInfo(
- blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
} else {
true
}
@@ -381,20 +373,17 @@ private[spark] class BlockManager(
info.synchronized {
info.level match {
case null =>
- BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
+ BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
- val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
- val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
+ val replication = if (inMem || onDisk) level.replication else 1
val storageLevel =
- StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
+ StorageLevel(onDisk, inMem, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
- val externalBlockStoreSize =
- if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
- BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
+ BlockStatus(storageLevel, memSize, diskSize)
}
}
}
@@ -475,25 +464,6 @@ private[spark] class BlockManager(
}
}
- // Look for the block in external block store
- if (level.useOffHeap) {
- logDebug(s"Getting block $blockId from ExternalBlockStore")
- if (externalBlockStore.contains(blockId)) {
- val result = if (asBlockResult) {
- externalBlockStore.getValues(blockId)
- .map(new BlockResult(_, DataReadMethod.Memory, info.size))
- } else {
- externalBlockStore.getBytes(blockId)
- }
- result match {
- case Some(values) =>
- return result
- case None =>
- logDebug(s"Block $blockId not found in ExternalBlockStore")
- }
- }
- }
-
// Look for block on disk, potentially storing it back in memory if required
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
@@ -786,9 +756,6 @@ private[spark] class BlockManager(
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
- } else if (putLevel.useOffHeap) {
- // Use external block store
- (false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
@@ -909,8 +876,7 @@ private[spark] class BlockManager(
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
- val tLevel = StorageLevel(
- level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
+ val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)
@@ -1120,9 +1086,7 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
- val removedFromExternalBlockStore =
- if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
- if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
+ if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
@@ -1212,9 +1176,6 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
- if (externalBlockStoreInitialized) {
- externalBlockStore.clear()
- }
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index da1de11d60..0b7aa599e9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -54,11 +54,9 @@ class BlockManagerMaster(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long): Boolean = {
+ diskSize: Long): Boolean = {
val res = driverEndpoint.askWithRetry[Boolean](
- UpdateBlockInfo(blockManagerId, blockId, storageLevel,
- memSize, diskSize, externalBlockStoreSize))
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logDebug(s"Updated info of block $blockId")
res
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 4db400a344..fbb3df8c3e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -59,10 +59,9 @@ class BlockManagerMasterEndpoint(
register(blockManagerId, maxMemSize, slaveEndpoint)
context.reply(true)
- case _updateBlockInfo @ UpdateBlockInfo(
- blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
- context.reply(updateBlockInfo(
- blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
+ case _updateBlockInfo @
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
case GetLocations(blockId) =>
@@ -325,8 +324,7 @@ class BlockManagerMasterEndpoint(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long): Boolean = {
+ diskSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
@@ -343,8 +341,7 @@ class BlockManagerMasterEndpoint(
return true
}
- blockManagerInfo(blockManagerId).updateBlockInfo(
- blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
+ blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
@@ -404,17 +401,13 @@ class BlockManagerMasterEndpoint(
}
@DeveloperApi
-case class BlockStatus(
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long) {
- def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
+case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
+ def isCached: Boolean = memSize + diskSize > 0
}
@DeveloperApi
object BlockStatus {
- def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
+ def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
}
private[spark] class BlockManagerInfo(
@@ -443,8 +436,7 @@ private[spark] class BlockManagerInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long) {
+ diskSize: Long) {
updateLastSeenMs()
@@ -468,7 +460,7 @@ private[spark] class BlockManagerInfo(
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
var blockStatus: BlockStatus = null
if (storageLevel.useMemory) {
- blockStatus = BlockStatus(storageLevel, memSize, 0, 0)
+ blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
@@ -476,17 +468,11 @@ private[spark] class BlockManagerInfo(
Utils.bytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
- blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)
+ blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
logInfo("Added %s on disk on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
- if (storageLevel.useOffHeap) {
- blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)
- _blocks.put(blockId, blockStatus)
- logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
- }
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
@@ -504,11 +490,6 @@ private[spark] class BlockManagerInfo(
logInfo("Removed %s on %s on disk (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
- if (blockStatus.storageLevel.useOffHeap) {
- logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
- blockId, blockManagerId.hostPort,
- Utils.bytesToString(blockStatus.externalBlockStoreSize)))
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index f392a4a0cd..6bded92700 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -63,12 +63,11 @@ private[spark] object BlockManagerMessages {
var blockId: BlockId,
var storageLevel: StorageLevel,
var memSize: Long,
- var diskSize: Long,
- var externalBlockStoreSize: Long)
+ var diskSize: Long)
extends ToBlockManagerMaster
with Externalizable {
- def this() = this(null, null, null, 0, 0, 0) // For deserialization only
+ def this() = this(null, null, null, 0, 0) // For deserialization only
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
blockManagerId.writeExternal(out)
@@ -76,7 +75,6 @@ private[spark] object BlockManagerMessages {
storageLevel.writeExternal(out)
out.writeLong(memSize)
out.writeLong(diskSize)
- out.writeLong(externalBlockStoreSize)
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -85,7 +83,6 @@ private[spark] object BlockManagerMessages {
storageLevel = StorageLevel(in)
memSize = in.readLong()
diskSize = in.readLong()
- externalBlockStoreSize = in.readLong()
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
index 2789e25b8d..0a14fcadf5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
@@ -26,8 +26,7 @@ private[spark] case class BlockUIData(
location: String,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long)
+ diskSize: Long)
/**
* The aggregated status of stream blocks in an executor
@@ -41,8 +40,6 @@ private[spark] case class ExecutorStreamBlockStatus(
def totalDiskSize: Long = blocks.map(_.diskSize).sum
- def totalExternalBlockStoreSize: Long = blocks.map(_.externalBlockStoreSize).sum
-
def numStreamBlocks: Int = blocks.size
}
@@ -62,7 +59,6 @@ private[spark] class BlockStatusListener extends SparkListener {
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
- val externalBlockStoreSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize
synchronized {
// Drop the update info if the block manager is not registered
@@ -74,8 +70,7 @@ private[spark] class BlockStatusListener extends SparkListener {
blockManagerId.hostPort,
storageLevel,
memSize,
- diskSize,
- externalBlockStoreSize)
+ diskSize)
)
} else {
// If isValid is not true, it means we should drop the block.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
index a5790e4454..e070bf658a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
@@ -30,8 +30,7 @@ case class BlockUpdatedInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long)
+ diskSize: Long)
private[spark] object BlockUpdatedInfo {
@@ -41,7 +40,6 @@ private[spark] object BlockUpdatedInfo {
updateBlockInfo.blockId,
updateBlockInfo.storageLevel,
updateBlockInfo.memSize,
- updateBlockInfo.diskSize,
- updateBlockInfo.externalBlockStoreSize)
+ updateBlockInfo.diskSize)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
deleted file mode 100644
index f39325a12d..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-/**
- * An abstract class that the concrete external block manager has to inherit.
- * The class has to have a no-argument constructor, and will be initialized by init,
- * which is invoked by ExternalBlockStore. The main input parameter is blockId for all
- * the methods, which is the unique identifier for Block in one Spark application.
- *
- * The underlying external block manager should avoid any name space conflicts among multiple
- * Spark applications. For example, creating different directory for different applications
- * by randomUUID
- *
- */
-private[spark] abstract class ExternalBlockManager {
-
- protected var blockManager: BlockManager = _
-
- override def toString: String = {"External Block Store"}
-
- /**
- * Initialize a concrete block manager implementation. Subclass should initialize its internal
- * data structure, e.g, file system, in this function, which is invoked by ExternalBlockStore
- * right after the class is constructed. The function should throw IOException on failure
- *
- * @throws java.io.IOException if there is any file system failure during the initialization.
- */
- def init(blockManager: BlockManager, executorId: String): Unit = {
- this.blockManager = blockManager
- }
-
- /**
- * Drop the block from underlying external block store, if it exists..
- * @return true on successfully removing the block
- * false if the block could not be removed as it was not found
- *
- * @throws java.io.IOException if there is any file system failure in removing the block.
- */
- def removeBlock(blockId: BlockId): Boolean
-
- /**
- * Used by BlockManager to check the existence of the block in the underlying external
- * block store.
- * @return true if the block exists.
- * false if the block does not exists.
- *
- * @throws java.io.IOException if there is any file system failure in checking
- * the block existence.
- */
- def blockExists(blockId: BlockId): Boolean
-
- /**
- * Put the given block to the underlying external block store. Note that in normal case,
- * putting a block should never fail unless something wrong happens to the underlying
- * external block store, e.g., file system failure, etc. In this case, IOException
- * should be thrown.
- *
- * @throws java.io.IOException if there is any file system failure in putting the block.
- */
- def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
-
- def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
- val bytes = blockManager.dataSerialize(blockId, values)
- putBytes(blockId, bytes)
- }
-
- /**
- * Retrieve the block bytes.
- * @return Some(ByteBuffer) if the block bytes is successfully retrieved
- * None if the block does not exist in the external block store.
- *
- * @throws java.io.IOException if there is any file system failure in getting the block.
- */
- def getBytes(blockId: BlockId): Option[ByteBuffer]
-
- /**
- * Retrieve the block data.
- * @return Some(Iterator[Any]) if the block data is successfully retrieved
- * None if the block does not exist in the external block store.
- *
- * @throws java.io.IOException if there is any file system failure in getting the block.
- */
- def getValues(blockId: BlockId): Option[Iterator[_]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
- }
-
- /**
- * Get the size of the block saved in the underlying external block store,
- * which is saved before by putBytes.
- * @return size of the block
- * 0 if the block does not exist
- *
- * @throws java.io.IOException if there is any file system failure in getting the block size.
- */
- def getSize(blockId: BlockId): Long
-
- /**
- * Clean up any information persisted in the underlying external block store,
- * e.g., the directory, files, etc,which is invoked by the shutdown hook of ExternalBlockStore
- * during system shutdown.
- *
- */
- def shutdown()
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
deleted file mode 100644
index 94883a54a7..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import scala.util.control.NonFatal
-
-import org.apache.spark.Logging
-import org.apache.spark.util.{ShutdownHookManager, Utils}
-
-
-/**
- * Stores BlockManager blocks on ExternalBlockStore.
- * We capture any potential exception from underlying implementation
- * and return with the expected failure value
- */
-private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: String)
- extends BlockStore(blockManager: BlockManager) with Logging {
-
- lazy val externalBlockManager: Option[ExternalBlockManager] = createBlkManager()
-
- logInfo("ExternalBlockStore started")
-
- override def getSize(blockId: BlockId): Long = {
- try {
- externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
- } catch {
- case NonFatal(t) =>
- logError(s"Error in getSize($blockId)", t)
- 0L
- }
- }
-
- override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
- putIntoExternalBlockStore(blockId, bytes, returnValues = true)
- }
-
- override def putArray(
- blockId: BlockId,
- values: Array[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
- }
-
- override def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIntoExternalBlockStore(blockId, values, returnValues)
- }
-
- private def putIntoExternalBlockStore(
- blockId: BlockId,
- values: Iterator[_],
- returnValues: Boolean): PutResult = {
- logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
- // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
- try {
- val startTime = System.currentTimeMillis
- if (externalBlockManager.isDefined) {
- externalBlockManager.get.putValues(blockId, values)
- val size = getSize(blockId)
- val data = if (returnValues) {
- Left(getValues(blockId).get)
- } else {
- null
- }
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
- blockId, Utils.bytesToString(size), finishTime - startTime))
- PutResult(size, data)
- } else {
- logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured")
- PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
- }
- } catch {
- case NonFatal(t) =>
- logError(s"Error in putValues($blockId)", t)
- PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
- }
- }
-
- private def putIntoExternalBlockStore(
- blockId: BlockId,
- bytes: ByteBuffer,
- returnValues: Boolean): PutResult = {
- logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
- // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
- try {
- val startTime = System.currentTimeMillis
- if (externalBlockManager.isDefined) {
- val byteBuffer = bytes.duplicate()
- byteBuffer.rewind()
- externalBlockManager.get.putBytes(blockId, byteBuffer)
- val size = bytes.limit()
- val data = if (returnValues) {
- Right(bytes)
- } else {
- null
- }
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
- blockId, Utils.bytesToString(size), finishTime - startTime))
- PutResult(size, data)
- } else {
- logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured")
- PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
- }
- } catch {
- case NonFatal(t) =>
- logError(s"Error in putBytes($blockId)", t)
- PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
- }
- }
-
- // We assume the block is removed even if exception thrown
- override def remove(blockId: BlockId): Boolean = {
- try {
- externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
- } catch {
- case NonFatal(t) =>
- logError(s"Error in removeBlock($blockId)", t)
- true
- }
- }
-
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- try {
- externalBlockManager.flatMap(_.getValues(blockId))
- } catch {
- case NonFatal(t) =>
- logError(s"Error in getValues($blockId)", t)
- None
- }
- }
-
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- try {
- externalBlockManager.flatMap(_.getBytes(blockId))
- } catch {
- case NonFatal(t) =>
- logError(s"Error in getBytes($blockId)", t)
- None
- }
- }
-
- override def contains(blockId: BlockId): Boolean = {
- try {
- val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
- if (!ret) {
- logInfo(s"Remove block $blockId")
- blockManager.removeBlock(blockId, true)
- }
- ret
- } catch {
- case NonFatal(t) =>
- logError(s"Error in getBytes($blockId)", t)
- false
- }
- }
-
- // Create concrete block manager and fall back to Tachyon by default for backward compatibility.
- private def createBlkManager(): Option[ExternalBlockManager] = {
- val clsName = blockManager.conf.getOption(ExternalBlockStore.BLOCK_MANAGER_NAME)
- .getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
-
- try {
- val instance = Utils.classForName(clsName)
- .newInstance()
- .asInstanceOf[ExternalBlockManager]
- instance.init(blockManager, executorId)
- ShutdownHookManager.addShutdownHook { () =>
- logDebug("Shutdown hook called")
- externalBlockManager.map(_.shutdown())
- }
- Some(instance)
- } catch {
- case NonFatal(t) =>
- logError("Cannot initialize external block store", t)
- None
- }
- }
-}
-
-private[spark] object ExternalBlockStore extends Logging {
- val MAX_DIR_CREATION_ATTEMPTS = 10
- val SUB_DIRS_PER_DIR = "64"
- val BASE_DIR = "spark.externalBlockStore.baseDir"
- val FOLD_NAME = "spark.externalBlockStore.folderName"
- val MASTER_URL = "spark.externalBlockStore.url"
- val BLOCK_MANAGER_NAME = "spark.externalBlockStore.blockManager"
- val DEFAULT_BLOCK_MANAGER_NAME = "org.apache.spark.storage.TachyonBlockManager"
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 673f7ad79d..083d78b59e 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, RDDOperationScope}
-import org.apache.spark.util.{CallSite, Utils}
+import org.apache.spark.util.Utils
@DeveloperApi
class RDDInfo(
@@ -37,15 +37,14 @@ class RDDInfo(
var diskSize = 0L
var externalBlockStoreSize = 0L
- def isCached: Boolean =
- (memSize + diskSize + externalBlockStoreSize > 0) && numCachedPartitions > 0
+ def isCached: Boolean = (memSize + diskSize > 0) && numCachedPartitions > 0
override def toString: String = {
import Utils.bytesToString
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
- "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s").format(
+ "MemorySize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
- bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize))
+ bytesToString(memSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo): Int = {
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 703bce3e6b..38e9534251 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -150,7 +150,9 @@ object StorageLevel {
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
- val OFF_HEAP = new StorageLevel(false, false, true, false)
+
+ // Redirect to MEMORY_ONLY_SER for now.
+ val OFF_HEAP = MEMORY_ONLY_SER
/**
* :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index c4ac30092f..8e2cfb2441 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -48,14 +48,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
* non-RDD blocks for the same reason. In particular, RDD storage information is stored
* in a map indexed by the RDD ID to the following 4-tuple:
*
- * (memory size, disk size, off-heap size, storage level)
+ * (memory size, disk size, storage level)
*
* We assume that all the blocks that belong to the same RDD have the same storage level.
* This field is not relevant to non-RDD blocks, however, so the storage information for
* non-RDD blocks contains only the first 3 fields (in the same order).
*/
- private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, Long, StorageLevel)]
- private var _nonRddStorageInfo: (Long, Long, Long) = (0L, 0L, 0L)
+ private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)]
+ private var _nonRddStorageInfo: (Long, Long) = (0L, 0L)
/** Create a storage status with an initial set of blocks, leaving the source unmodified. */
def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
@@ -177,20 +177,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
/** Return the disk space used by this block manager. */
def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
- /** Return the off-heap space used by this block manager. */
- def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
-
/** Return the memory used by the given RDD in this block manager in O(1) time. */
def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
/** Return the disk space used by the given RDD in this block manager in O(1) time. */
def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
- /** Return the off-heap space used by the given RDD in this block manager in O(1) time. */
- def offHeapUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L)
-
/** Return the storage level, if any, used by the given RDD in this block manager. */
- def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._4)
+ def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3)
/**
* Update the relevant storage info, taking into account any existing status for this block.
@@ -199,34 +193,31 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize
val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize
- val changeInExternalBlockStore =
- newBlockStatus.externalBlockStoreSize - oldBlockStatus.externalBlockStoreSize
val level = newBlockStatus.storageLevel
// Compute new info from old info
- val (oldMem, oldDisk, oldExternalBlockStore) = blockId match {
+ val (oldMem, oldDisk) = blockId match {
case RDDBlockId(rddId, _) =>
_rddStorageInfo.get(rddId)
- .map { case (mem, disk, externalBlockStore, _) => (mem, disk, externalBlockStore) }
- .getOrElse((0L, 0L, 0L))
+ .map { case (mem, disk, _) => (mem, disk) }
+ .getOrElse((0L, 0L))
case _ =>
_nonRddStorageInfo
}
val newMem = math.max(oldMem + changeInMem, 0L)
val newDisk = math.max(oldDisk + changeInDisk, 0L)
- val newExternalBlockStore = math.max(oldExternalBlockStore + changeInExternalBlockStore, 0L)
// Set the correct info
blockId match {
case RDDBlockId(rddId, _) =>
// If this RDD is no longer persisted, remove it
- if (newMem + newDisk + newExternalBlockStore == 0) {
+ if (newMem + newDisk == 0) {
_rddStorageInfo.remove(rddId)
} else {
- _rddStorageInfo(rddId) = (newMem, newDisk, newExternalBlockStore, level)
+ _rddStorageInfo(rddId) = (newMem, newDisk, level)
}
case _ =>
- _nonRddStorageInfo = (newMem, newDisk, newExternalBlockStore)
+ _nonRddStorageInfo = (newMem, newDisk)
}
}
@@ -248,13 +239,11 @@ private[spark] object StorageUtils {
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
- val externalBlockStoreSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
rddInfo.storageLevel = storageLevel
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
rddInfo.diskSize = diskSize
- rddInfo.externalBlockStoreSize = externalBlockStoreSize
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
deleted file mode 100644
index 6aa7e13901..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.io.IOException
-import java.nio.ByteBuffer
-import java.text.SimpleDateFormat
-import java.util.{Date, Random}
-
-import scala.util.control.NonFatal
-
-import com.google.common.io.ByteStreams
-import tachyon.{Constants, TachyonURI}
-import tachyon.client.ClientContext
-import tachyon.client.file.{TachyonFile, TachyonFileSystem}
-import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory
-import tachyon.client.file.options.DeleteOptions
-import tachyon.conf.TachyonConf
-import tachyon.exception.{FileAlreadyExistsException, FileDoesNotExistException}
-
-import org.apache.spark.Logging
-import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
-
-/**
- * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By
- * default, one block is mapped to one file with a name given by its BlockId.
- *
- */
-private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
-
- var rootDirs: String = _
- var master: String = _
- var client: TachyonFileSystem = _
- private var subDirsPerTachyonDir: Int = _
-
- // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
- // then, inside this directory, create multiple subdirectories that we will hash files into,
- // in order to avoid having really large inodes at the top level in Tachyon.
- private var tachyonDirs: Array[TachyonFile] = _
- private var subDirs: Array[Array[TachyonFile]] = _
- private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
-
- override def init(blockManager: BlockManager, executorId: String): Unit = {
- super.init(blockManager, executorId)
- val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
- val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
-
- rootDirs = s"$storeDir/$appFolderName/$executorId"
- master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
- client = if (master != null && master != "") {
- val tachyonConf = new TachyonConf()
- tachyonConf.set(Constants.MASTER_ADDRESS, master)
- ClientContext.reset(tachyonConf)
- TachyonFileSystemFactory.get
- } else {
- null
- }
- // original implementation call System.exit, we change it to run without extblkstore support
- if (client == null) {
- logError("Failed to connect to the Tachyon as the master address is not configured")
- throw new IOException("Failed to connect to the Tachyon as the master " +
- "address is not configured")
- }
- subDirsPerTachyonDir = blockManager.conf.get("spark.externalBlockStore.subDirectories",
- ExternalBlockStore.SUB_DIRS_PER_DIR).toInt
-
- // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
- // then, inside this directory, create multiple subdirectories that we will hash files into,
- // in order to avoid having really large inodes at the top level in Tachyon.
- tachyonDirs = createTachyonDirs()
- subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
- tachyonDirs.foreach(registerShutdownDeleteDir)
- }
-
- override def toString: String = {"ExternalBlockStore-Tachyon"}
-
- override def removeBlock(blockId: BlockId): Boolean = {
- val file = getFile(blockId)
- if (fileExists(file)) {
- removeFile(file)
- true
- } else {
- false
- }
- }
-
- override def blockExists(blockId: BlockId): Boolean = {
- val file = getFile(blockId)
- fileExists(file)
- }
-
- override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
- val file = getFile(blockId)
- val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
- try {
- Utils.writeByteBuffer(bytes, os)
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to put bytes of block $blockId into Tachyon", e)
- os.cancel()
- } finally {
- os.close()
- }
- }
-
- override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
- val file = getFile(blockId)
- val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
- try {
- blockManager.dataSerializeStream(blockId, os, values)
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to put values of block $blockId into Tachyon", e)
- os.cancel()
- } finally {
- os.close()
- }
- }
-
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- val file = getFile(blockId)
- if (file == null) {
- return None
- }
- val is = try {
- client.getInStream(file)
- } catch {
- case _: FileDoesNotExistException =>
- return None
- }
- try {
- val size = client.getInfo(file).length
- val bs = new Array[Byte](size.asInstanceOf[Int])
- ByteStreams.readFully(is, bs)
- Some(ByteBuffer.wrap(bs))
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to get bytes of block $blockId from Tachyon", e)
- None
- } finally {
- is.close()
- }
- }
-
- override def getValues(blockId: BlockId): Option[Iterator[_]] = {
- val file = getFile(blockId)
- if (file == null) {
- return None
- }
- val is = try {
- client.getInStream(file)
- } catch {
- case _: FileDoesNotExistException =>
- return None
- }
- try {
- Some(blockManager.dataDeserializeStream(blockId, is))
- } finally {
- is.close()
- }
- }
-
- override def getSize(blockId: BlockId): Long = {
- client.getInfo(getFile(blockId.name)).length
- }
-
- def removeFile(file: TachyonFile): Unit = {
- client.delete(file)
- }
-
- def fileExists(file: TachyonFile): Boolean = {
- try {
- client.getInfo(file)
- true
- } catch {
- case _: FileDoesNotExistException => false
- }
- }
-
- def getFile(filename: String): TachyonFile = {
- // Figure out which tachyon directory it hashes to, and which subdirectory in that
- val hash = Utils.nonNegativeHash(filename)
- val dirId = hash % tachyonDirs.length
- val subDirId = (hash / tachyonDirs.length) % subDirsPerTachyonDir
-
- // Create the subdirectory if it doesn't already exist
- var subDir = subDirs(dirId)(subDirId)
- if (subDir == null) {
- subDir = subDirs(dirId).synchronized {
- val old = subDirs(dirId)(subDirId)
- if (old != null) {
- old
- } else {
- val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
- client.mkdir(path)
- val newDir = client.loadMetadata(path)
- subDirs(dirId)(subDirId) = newDir
- newDir
- }
- }
- }
- val filePath = new TachyonURI(s"$subDir/$filename")
- try {
- client.create(filePath)
- } catch {
- case _: FileAlreadyExistsException => client.loadMetadata(filePath)
- }
- }
-
- def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name)
-
- // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
- private def createTachyonDirs(): Array[TachyonFile] = {
- logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
- val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
- rootDirs.split(",").map { rootDir =>
- var foundLocalDir = false
- var tachyonDir: TachyonFile = null
- var tachyonDirId: String = null
- var tries = 0
- val rand = new Random()
- while (!foundLocalDir && tries < ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS) {
- tries += 1
- try {
- tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
- val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
- try {
- foundLocalDir = client.mkdir(path)
- tachyonDir = client.loadMetadata(path)
- } catch {
- case _: FileAlreadyExistsException => // continue
- }
- } catch {
- case NonFatal(e) =>
- logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
- }
- }
- if (!foundLocalDir) {
- logError("Failed " + ExternalBlockStore.MAX_DIR_CREATION_ATTEMPTS
- + " attempts to create tachyon dir in " + rootDir)
- System.exit(ExecutorExitCode.EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR)
- }
- logInfo("Created tachyon directory at " + tachyonDir)
- tachyonDir
- }
- }
-
- override def shutdown() {
- logDebug("Shutdown hook called")
- tachyonDirs.foreach { tachyonDir =>
- try {
- if (!hasRootAsShutdownDeleteDir(tachyonDir)) {
- deleteRecursively(tachyonDir, client)
- }
- } catch {
- case NonFatal(e) =>
- logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
- }
- }
- }
-
- /**
- * Delete a file or directory and its contents recursively.
- */
- private def deleteRecursively(dir: TachyonFile, client: TachyonFileSystem) {
- client.delete(dir, new DeleteOptions.Builder(ClientContext.getConf).setRecursive(true).build())
- }
-
- // Register the tachyon path to be deleted via shutdown hook
- private def registerShutdownDeleteDir(file: TachyonFile) {
- val absolutePath = client.getInfo(file).getPath
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths += absolutePath
- }
- }
-
- // Remove the tachyon path to be deleted via shutdown hook
- private def removeShutdownDeleteDir(file: TachyonFile) {
- val absolutePath = client.getInfo(file).getPath
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths -= absolutePath
- }
- }
-
- // Is the path already registered to be deleted via a shutdown hook ?
- private def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
- val absolutePath = client.getInfo(file).getPath
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths.contains(absolutePath)
- }
- }
-
- // Note: if file is child of some registered path, while not equal to it, then return true;
- // else false. This is to ensure that two shutdown hooks do not try to delete each others
- // paths - resulting in Exception and incomplete cleanup.
- private def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
- val absolutePath = client.getInfo(file).getPath
- val hasRoot = shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths.exists(
- path => !absolutePath.equals(path) && absolutePath.startsWith(path))
- }
- if (hasRoot) {
- logInfo(s"path = $absolutePath, already present as root for deletion.")
- }
- hasRoot
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 04f584621e..c9bb49b83e 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -54,7 +54,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
"Cached Partitions",
"Fraction Cached",
"Size in Memory",
- "Size in ExternalBlockStore",
"Size on Disk")
/** Render an HTML row representing an RDD */
@@ -71,7 +70,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
<td>{rdd.numCachedPartitions.toString}</td>
<td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
<td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
- <td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td>
<td sorttable_customkey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td>
</tr>
// scalastyle:on
@@ -104,7 +102,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
"Executor ID",
"Address",
"Total Size in Memory",
- "Total Size in ExternalBlockStore",
"Total Size on Disk",
"Stream Blocks")
@@ -119,9 +116,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
<td sorttable_customkey={status.totalMemSize.toString}>
{Utils.bytesToString(status.totalMemSize)}
</td>
- <td sorttable_customkey={status.totalExternalBlockStoreSize.toString}>
- {Utils.bytesToString(status.totalExternalBlockStoreSize)}
- </td>
<td sorttable_customkey={status.totalDiskSize.toString}>
{Utils.bytesToString(status.totalDiskSize)}
</td>
@@ -195,8 +189,6 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
("Memory", block.memSize)
} else if (block.storageLevel.useMemory && !block.storageLevel.deserialized) {
("Memory Serialized", block.memSize)
- } else if (block.storageLevel.useOffHeap) {
- ("External", block.externalBlockStoreSize)
} else {
throw new IllegalStateException(s"Invalid Storage Level: ${block.storageLevel}")
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a62fd2f339..a6460bc8b8 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -409,14 +409,12 @@ private[spark] object JsonProtocol {
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
- ("ExternalBlockStore Size" -> rddInfo.externalBlockStoreSize) ~
("Disk Size" -> rddInfo.diskSize)
}
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
- ("Use ExternalBlockStore" -> storageLevel.useOffHeap) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
@@ -425,7 +423,6 @@ private[spark] object JsonProtocol {
val storageLevel = storageLevelToJson(blockStatus.storageLevel)
("Storage Level" -> storageLevel) ~
("Memory Size" -> blockStatus.memSize) ~
- ("ExternalBlockStore Size" -> blockStatus.externalBlockStoreSize) ~
("Disk Size" -> blockStatus.diskSize)
}
@@ -867,15 +864,11 @@ private[spark] object JsonProtocol {
val numPartitions = (json \ "Number of Partitions").extract[Int]
val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
val memSize = (json \ "Memory Size").extract[Long]
- // fallback to tachyon for backward compatibility
- val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
- .getOrElse(json \ "Tachyon Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope)
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
- rddInfo.externalBlockStoreSize = externalBlockStoreSize
rddInfo.diskSize = diskSize
rddInfo
}
@@ -883,22 +876,16 @@ private[spark] object JsonProtocol {
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
- // fallback to tachyon for backward compatability
- val useExternalBlockStore = (json \ "Use ExternalBlockStore").toSome
- .getOrElse(json \ "Use Tachyon").extract[Boolean]
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
- StorageLevel(useDisk, useMemory, useExternalBlockStore, deserialized, replication)
+ StorageLevel(useDisk, useMemory, deserialized, replication)
}
def blockStatusFromJson(json: JValue): BlockStatus = {
val storageLevel = storageLevelFromJson(json \ "Storage Level")
val memorySize = (json \ "Memory Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]
- // fallback to tachyon for backward compatability
- val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
- .getOrElse(json \ "Tachyon Size").extract[Long]
- BlockStatus(storageLevel, memorySize, diskSize, externalBlockStoreSize)
+ BlockStatus(storageLevel, memorySize, diskSize)
}
def executorInfoFromJson(json: JValue): ExecutorInfo = {
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index f2924a6a5c..3b2368798c 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -102,14 +102,14 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
// We can evict enough blocks to fulfill the request for space
mm.releaseStorageMemory(numBytesToFree)
args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
- (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
+ (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
// We need to add this call so that that the suite-level `evictedBlocks` is updated when
// execution evicts storage; in that case, args.last will not be equal to evictedBlocks
// because it will be a temporary buffer created inside of the MemoryManager rather than
// being passed in by the test code.
if (!(evictedBlocks eq args.last)) {
evictedBlocks.append(
- (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
+ (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
}
true
} else {
diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
index e694f5e5e7..2802cd9752 100644
--- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.rdd
-import org.mockito.Mockito.spy
-
import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
@@ -46,10 +44,6 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
assert(transform(StorageLevel.MEMORY_AND_DISK_SER) === StorageLevel.MEMORY_AND_DISK_SER)
assert(transform(StorageLevel.MEMORY_AND_DISK_2) === StorageLevel.MEMORY_AND_DISK_2)
assert(transform(StorageLevel.MEMORY_AND_DISK_SER_2) === StorageLevel.MEMORY_AND_DISK_SER_2)
- // Off-heap is not supported and Spark should fail fast
- intercept[SparkException] {
- transform(StorageLevel.OFF_HEAP)
- }
}
test("basic lineage truncation") {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 62e6c4f793..0f31561170 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -121,11 +121,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
test("StorageLevel object caching") {
- val level1 = StorageLevel(false, false, false, false, 3)
+ val level1 = StorageLevel(false, false, false, 3)
// this should return the same object as level1
- val level2 = StorageLevel(false, false, false, false, 3)
+ val level2 = StorageLevel(false, false, false, 3)
// this should return a different object
- val level3 = StorageLevel(false, false, false, false, 2)
+ val level3 = StorageLevel(false, false, false, 2)
assert(level2 === level1, "level2 is not same as level1")
assert(level2.eq(level1), "level2 is not the same object as level1")
assert(level3 != level1, "level3 is same as level1")
@@ -562,26 +562,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store")
}
- test("tachyon storage") {
- // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
- val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
- conf.set(ExternalBlockStore.BLOCK_MANAGER_NAME, ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
- if (tachyonUnitTestEnabled) {
- store = makeBlockManager(1200)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
- store.putSingle("a1", a1, StorageLevel.OFF_HEAP)
- store.putSingle("a2", a2, StorageLevel.OFF_HEAP)
- store.putSingle("a3", a3, StorageLevel.OFF_HEAP)
- assert(store.getSingle("a3").isDefined, "a3 was in store")
- assert(store.getSingle("a2").isDefined, "a2 was in store")
- assert(store.getSingle("a1").isDefined, "a1 was in store")
- } else {
- info("tachyon storage test disabled.")
- }
- }
-
test("on-disk storage") {
store = makeBlockManager(1200)
val a1 = new Array[Byte](400)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
index d7ffde1e78..06acca3943 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
@@ -34,16 +34,14 @@ class BlockStatusListenerSuite extends SparkFunSuite {
StreamBlockId(0, 100),
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
- diskSize = 100,
- externalBlockStoreSize = 0)))
+ diskSize = 100)))
// The new block status should be added to the listener
val expectedBlock = BlockUIData(
StreamBlockId(0, 100),
"localhost:10000",
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
- diskSize = 100,
- externalBlockStoreSize = 0
+ diskSize = 100
)
val expectedExecutorStreamBlockStatus = Seq(
ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))
@@ -60,15 +58,13 @@ class BlockStatusListenerSuite extends SparkFunSuite {
StreamBlockId(0, 100),
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
- diskSize = 100,
- externalBlockStoreSize = 0)))
+ diskSize = 100)))
val expectedBlock2 = BlockUIData(
StreamBlockId(0, 100),
"localhost:10001",
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
- diskSize = 100,
- externalBlockStoreSize = 0
+ diskSize = 100
)
// Each block manager should contain one block
val expectedExecutorStreamBlockStatus2 = Set(
@@ -84,8 +80,7 @@ class BlockStatusListenerSuite extends SparkFunSuite {
StreamBlockId(0, 100),
StorageLevel.NONE, // StorageLevel.NONE means removing it
memSize = 0,
- diskSize = 0,
- externalBlockStoreSize = 0)))
+ diskSize = 0)))
// Only the first block manager contains a block
val expectedExecutorStreamBlockStatus3 = Set(
ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)),
@@ -102,8 +97,7 @@ class BlockStatusListenerSuite extends SparkFunSuite {
StreamBlockId(0, 100),
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
- diskSize = 100,
- externalBlockStoreSize = 0)))
+ diskSize = 100)))
// The second block manager is removed so we should not see the new block
val expectedExecutorStreamBlockStatus4 = Seq(
ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 1a199beb35..355d80d068 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -82,9 +82,9 @@ class StorageStatusListenerSuite extends SparkFunSuite {
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
- val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
- val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
- val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
+ val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
+ val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
+ val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
taskMetrics2.updatedBlocks = Some(Seq(block3))
@@ -105,9 +105,9 @@ class StorageStatusListenerSuite extends SparkFunSuite {
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
// Task end with dropped blocks
- val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
- val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
- val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
+ val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L))
+ val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L))
+ val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L))
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
@@ -130,9 +130,9 @@ class StorageStatusListenerSuite extends SparkFunSuite {
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
- val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
- val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
- val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
+ val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
+ val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
+ val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
taskMetrics2.updatedBlocks = Some(Seq(block3))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index 1d5a813a4d..e5733aebf6 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -33,10 +33,9 @@ class StorageSuite extends SparkFunSuite {
assert(status.memUsed === 0L)
assert(status.memRemaining === 1000L)
assert(status.diskUsed === 0L)
- assert(status.offHeapUsed === 0L)
- status.addBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 10L, 20L, 1L))
- status.addBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 10L, 20L, 1L))
- status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status.addBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L))
status
}
@@ -50,18 +49,16 @@ class StorageSuite extends SparkFunSuite {
assert(status.memUsed === 30L)
assert(status.memRemaining === 970L)
assert(status.diskUsed === 60L)
- assert(status.offHeapUsed === 3L)
}
test("storage status update non-RDD blocks") {
val status = storageStatus1
- status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L, 1L))
- status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L, 0L))
+ status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L))
+ status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L))
assert(status.blocks.size === 3)
assert(status.memUsed === 160L)
assert(status.memRemaining === 840L)
assert(status.diskUsed === 140L)
- assert(status.offHeapUsed === 2L)
}
test("storage status remove non-RDD blocks") {
@@ -73,20 +70,19 @@ class StorageSuite extends SparkFunSuite {
assert(status.memUsed === 10L)
assert(status.memRemaining === 990L)
assert(status.diskUsed === 20L)
- assert(status.offHeapUsed === 1L)
}
// For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
private def storageStatus2: StorageStatus = {
val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
assert(status.rddBlocks.isEmpty)
- status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L, 0L))
- status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L, 0L))
- status.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 10L, 20L, 1L))
- status.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 100L, 200L, 1L))
- status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L, 1L))
- status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L, 0L))
- status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L, 0L))
+ status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 100L, 200L))
+ status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L))
status
}
@@ -113,9 +109,6 @@ class StorageSuite extends SparkFunSuite {
assert(status.diskUsedByRdd(0) === 20L)
assert(status.diskUsedByRdd(1) === 200L)
assert(status.diskUsedByRdd(2) === 80L)
- assert(status.offHeapUsedByRdd(0) === 1L)
- assert(status.offHeapUsedByRdd(1) === 1L)
- assert(status.offHeapUsedByRdd(2) === 1L)
assert(status.rddStorageLevel(0) === Some(memAndDisk))
assert(status.rddStorageLevel(1) === Some(memAndDisk))
assert(status.rddStorageLevel(2) === Some(memAndDisk))
@@ -124,15 +117,14 @@ class StorageSuite extends SparkFunSuite {
assert(status.rddBlocksById(10).isEmpty)
assert(status.memUsedByRdd(10) === 0L)
assert(status.diskUsedByRdd(10) === 0L)
- assert(status.offHeapUsedByRdd(10) === 0L)
assert(status.rddStorageLevel(10) === None)
}
test("storage status update RDD blocks") {
val status = storageStatus2
- status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 5000L, 0L, 0L))
- status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L))
- status.updateBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 0L, 1000L, 0L))
+ status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 5000L, 0L))
+ status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L))
+ status.updateBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 0L, 1000L))
assert(status.blocks.size === 7)
assert(status.rddBlocks.size === 5)
assert(status.rddBlocksById(0).size === 1)
@@ -144,9 +136,6 @@ class StorageSuite extends SparkFunSuite {
assert(status.diskUsedByRdd(0) === 0L)
assert(status.diskUsedByRdd(1) === 200L)
assert(status.diskUsedByRdd(2) === 1060L)
- assert(status.offHeapUsedByRdd(0) === 0L)
- assert(status.offHeapUsedByRdd(1) === 1L)
- assert(status.offHeapUsedByRdd(2) === 0L)
}
test("storage status remove RDD blocks") {
@@ -170,9 +159,6 @@ class StorageSuite extends SparkFunSuite {
assert(status.diskUsedByRdd(0) === 20L)
assert(status.diskUsedByRdd(1) === 0L)
assert(status.diskUsedByRdd(2) === 20L)
- assert(status.offHeapUsedByRdd(0) === 1L)
- assert(status.offHeapUsedByRdd(1) === 0L)
- assert(status.offHeapUsedByRdd(2) === 0L)
}
test("storage status containsBlock") {
@@ -209,17 +195,17 @@ class StorageSuite extends SparkFunSuite {
val status = storageStatus2
assert(status.blocks.size === status.numBlocks)
assert(status.rddBlocks.size === status.numRddBlocks)
- status.addBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 0L, 100L))
- status.addBlock(RDDBlockId(4, 4), BlockStatus(memAndDisk, 0L, 0L, 100L))
- status.addBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.addBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 0L))
+ status.addBlock(RDDBlockId(4, 4), BlockStatus(memAndDisk, 0L, 0L))
+ status.addBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L))
assert(status.blocks.size === status.numBlocks)
assert(status.rddBlocks.size === status.numRddBlocks)
assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
- status.updateBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 10L, 400L))
- status.updateBlock(RDDBlockId(4, 0), BlockStatus(memAndDisk, 0L, 0L, 100L))
- status.updateBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
- status.updateBlock(RDDBlockId(10, 10), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.updateBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 10L))
+ status.updateBlock(RDDBlockId(4, 0), BlockStatus(memAndDisk, 0L, 0L))
+ status.updateBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L))
+ status.updateBlock(RDDBlockId(10, 10), BlockStatus(memAndDisk, 0L, 0L))
assert(status.blocks.size === status.numBlocks)
assert(status.rddBlocks.size === status.numRddBlocks)
assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
@@ -244,29 +230,24 @@ class StorageSuite extends SparkFunSuite {
val status = storageStatus2
def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
- def actualOffHeapUsed: Long = status.blocks.values.map(_.externalBlockStoreSize).sum
assert(status.memUsed === actualMemUsed)
assert(status.diskUsed === actualDiskUsed)
- assert(status.offHeapUsed === actualOffHeapUsed)
- status.addBlock(TestBlockId("fire"), BlockStatus(memAndDisk, 4000L, 5000L, 6000L))
- status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L, 600L))
- status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L, 60L))
+ status.addBlock(TestBlockId("fire"), BlockStatus(memAndDisk, 4000L, 5000L))
+ status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L))
+ status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L))
assert(status.memUsed === actualMemUsed)
assert(status.diskUsed === actualDiskUsed)
- assert(status.offHeapUsed === actualOffHeapUsed)
- status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L, 6L))
- status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 4L, 5L, 6L))
- status.updateBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 4L, 5L, 6L))
+ status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L))
+ status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 4L, 5L))
+ status.updateBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 4L, 5L))
assert(status.memUsed === actualMemUsed)
assert(status.diskUsed === actualDiskUsed)
- assert(status.offHeapUsed === actualOffHeapUsed)
status.removeBlock(TestBlockId("fire"))
status.removeBlock(TestBlockId("man"))
status.removeBlock(RDDBlockId(2, 2))
status.removeBlock(RDDBlockId(2, 3))
assert(status.memUsed === actualMemUsed)
assert(status.diskUsed === actualDiskUsed)
- assert(status.offHeapUsed === actualOffHeapUsed)
}
// For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
@@ -274,14 +255,14 @@ class StorageSuite extends SparkFunSuite {
val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L)
val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L)
- status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
- status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
- status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))
- status2.addBlock(RDDBlockId(0, 3), BlockStatus(memAndDisk, 1L, 2L, 0L))
- status2.addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
- status2.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
- status3.addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L))
- status3.addBlock(RDDBlockId(1, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
+ status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L))
+ status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L))
+ status2.addBlock(RDDBlockId(0, 3), BlockStatus(memAndDisk, 1L, 2L))
+ status2.addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L))
+ status2.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 1L, 2L))
+ status3.addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L))
+ status3.addBlock(RDDBlockId(1, 2), BlockStatus(memAndDisk, 1L, 2L))
Seq(status1, status2, status3)
}
@@ -334,9 +315,9 @@ class StorageSuite extends SparkFunSuite {
test("StorageUtils.getRddBlockLocations with multiple locations") {
val storageStatuses = stockStorageStatuses
- storageStatuses(0).addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
- storageStatuses(0).addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L))
- storageStatuses(2).addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ storageStatuses(0).addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L))
+ storageStatuses(0).addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L))
+ storageStatuses(2).addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses)
val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses)
assert(blockLocations0.size === 5)
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
index 3dab15a9d4..350c174e24 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.ui.storage
-import scala.xml.Utility
-
import org.mockito.Mockito._
import org.apache.spark.SparkFunSuite
@@ -64,26 +62,24 @@ class StoragePageSuite extends SparkFunSuite {
"Cached Partitions",
"Fraction Cached",
"Size in Memory",
- "Size in ExternalBlockStore",
"Size on Disk")
assert((xmlNodes \\ "th").map(_.text) === headers)
assert((xmlNodes \\ "tr").size === 3)
assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) ===
- Seq("rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B", "0.0 B"))
+ Seq("rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B"))
// Check the url
assert(((xmlNodes \\ "tr")(0) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
Some("http://localhost:4040/storage/rdd?id=1"))
assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) ===
- Seq("rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "0.0 B", "200.0 B"))
+ Seq("rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "200.0 B"))
// Check the url
assert(((xmlNodes \\ "tr")(1) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
Some("http://localhost:4040/storage/rdd?id=2"))
assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) ===
- Seq("rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "0.0 B",
- "500.0 B"))
+ Seq("rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "500.0 B"))
// Check the url
assert(((xmlNodes \\ "tr")(2) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
Some("http://localhost:4040/storage/rdd?id=3"))
@@ -98,16 +94,14 @@ class StoragePageSuite extends SparkFunSuite {
"localhost:1111",
StorageLevel.MEMORY_ONLY,
memSize = 100,
- diskSize = 0,
- externalBlockStoreSize = 0)
+ diskSize = 0)
assert(("Memory", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(memoryBlock))
val memorySerializedBlock = BlockUIData(StreamBlockId(0, 0),
"localhost:1111",
StorageLevel.MEMORY_ONLY_SER,
memSize = 100,
- diskSize = 0,
- externalBlockStoreSize = 0)
+ diskSize = 0)
assert(("Memory Serialized", 100) ===
storagePage.streamBlockStorageLevelDescriptionAndSize(memorySerializedBlock))
@@ -115,18 +109,8 @@ class StoragePageSuite extends SparkFunSuite {
"localhost:1111",
StorageLevel.DISK_ONLY,
memSize = 0,
- diskSize = 100,
- externalBlockStoreSize = 0)
+ diskSize = 100)
assert(("Disk", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(diskBlock))
-
- val externalBlock = BlockUIData(StreamBlockId(0, 0),
- "localhost:1111",
- StorageLevel.OFF_HEAP,
- memSize = 0,
- diskSize = 0,
- externalBlockStoreSize = 100)
- assert(("External", 100) ===
- storagePage.streamBlockStorageLevelDescriptionAndSize(externalBlock))
}
test("receiverBlockTables") {
@@ -135,14 +119,12 @@ class StoragePageSuite extends SparkFunSuite {
"localhost:10000",
StorageLevel.MEMORY_ONLY,
memSize = 100,
- diskSize = 0,
- externalBlockStoreSize = 0),
+ diskSize = 0),
BlockUIData(StreamBlockId(1, 1),
"localhost:10000",
StorageLevel.DISK_ONLY,
memSize = 0,
- diskSize = 100,
- externalBlockStoreSize = 0)
+ diskSize = 100)
)
val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", blocksForExecutor0)
@@ -151,20 +133,12 @@ class StoragePageSuite extends SparkFunSuite {
"localhost:10001",
StorageLevel.MEMORY_ONLY,
memSize = 100,
- diskSize = 0,
- externalBlockStoreSize = 0),
- BlockUIData(StreamBlockId(2, 2),
- "localhost:10001",
- StorageLevel.OFF_HEAP,
- memSize = 0,
- diskSize = 0,
- externalBlockStoreSize = 200),
+ diskSize = 0),
BlockUIData(StreamBlockId(1, 1),
"localhost:10001",
StorageLevel.MEMORY_ONLY_SER,
memSize = 100,
- diskSize = 0,
- externalBlockStoreSize = 0)
+ diskSize = 0)
)
val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", blocksForExecutor1)
val xmlNodes = storagePage.receiverBlockTables(Seq(executor0, executor1))
@@ -174,16 +148,15 @@ class StoragePageSuite extends SparkFunSuite {
"Executor ID",
"Address",
"Total Size in Memory",
- "Total Size in ExternalBlockStore",
"Total Size on Disk",
"Stream Blocks")
assert((executorTable \\ "th").map(_.text) === executorHeaders)
assert((executorTable \\ "tr").size === 2)
assert(((executorTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
- Seq("0", "localhost:10000", "100.0 B", "0.0 B", "100.0 B", "2"))
+ Seq("0", "localhost:10000", "100.0 B", "100.0 B", "2"))
assert(((executorTable \\ "tr")(1) \\ "td").map(_.text.trim) ===
- Seq("1", "localhost:10001", "200.0 B", "200.0 B", "0.0 B", "3"))
+ Seq("1", "localhost:10001", "200.0 B", "0.0 B", "2"))
val blockTable = (xmlNodes \\ "table")(1)
val blockHeaders = Seq(
@@ -194,7 +167,7 @@ class StoragePageSuite extends SparkFunSuite {
"Size")
assert((blockTable \\ "th").map(_.text) === blockHeaders)
- assert((blockTable \\ "tr").size === 5)
+ assert((blockTable \\ "tr").size === 4)
assert(((blockTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
Seq("input-0-0", "2", "localhost:10000", "Memory", "100.0 B"))
// Check "rowspan=2" for the first 2 columns
@@ -212,17 +185,10 @@ class StoragePageSuite extends SparkFunSuite {
assert(((blockTable \\ "tr")(3) \\ "td").map(_.text.trim) ===
Seq("localhost:10001", "Memory Serialized", "100.0 B"))
-
- assert(((blockTable \\ "tr")(4) \\ "td").map(_.text.trim) ===
- Seq("input-2-2", "1", "localhost:10001", "External", "200.0 B"))
- // Check "rowspan=1" for the first 2 columns
- assert(((blockTable \\ "tr")(4) \\ "td")(0).attribute("rowspan").map(_.text) === Some("1"))
- assert(((blockTable \\ "tr")(4) \\ "td")(1).attribute("rowspan").map(_.text) === Some("1"))
}
test("empty receiverBlockTables") {
assert(storagePage.receiverBlockTables(Seq.empty).isEmpty)
-
val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", Seq.empty)
val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty)
assert(storagePage.receiverBlockTables(Seq(executor0, executor1)).isEmpty)
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 4b838a8ab1..5ac922c217 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -128,20 +128,17 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
// Task end with a few new persisted blocks, some from the same RDD
val metrics1 = new TaskMetrics
metrics1.updatedBlocks = Some(Seq(
- (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L, 0L)),
- (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L, 0L)),
- (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)),
- (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L))
+ (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L)),
+ (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L)),
+ (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L))
))
bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
- assert(storageListener._rddInfoMap(0).memSize === 800L)
+ assert(storageListener._rddInfoMap(0).memSize === 400L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
- assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
- assert(storageListener._rddInfoMap(0).numCachedPartitions === 3)
+ assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
assert(storageListener._rddInfoMap(0).isCached)
assert(storageListener._rddInfoMap(1).memSize === 0L)
assert(storageListener._rddInfoMap(1).diskSize === 240L)
- assert(storageListener._rddInfoMap(1).externalBlockStoreSize === 0L)
assert(storageListener._rddInfoMap(1).numCachedPartitions === 1)
assert(storageListener._rddInfoMap(1).isCached)
assert(!storageListener._rddInfoMap(2).isCached)
@@ -150,16 +147,15 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
// Task end with a few dropped blocks
val metrics2 = new TaskMetrics
metrics2.updatedBlocks = Some(Seq(
- (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L, 0L)),
- (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L, 0L)),
- (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist
- (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist
+ (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L)),
+ (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L)),
+ (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L)), // doesn't actually exist
+ (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L)) // doesn't actually exist
))
bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
- assert(storageListener._rddInfoMap(0).memSize === 400L)
+ assert(storageListener._rddInfoMap(0).memSize === 0L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
- assert(storageListener._rddInfoMap(0).externalBlockStoreSize === 200L)
- assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
+ assert(storageListener._rddInfoMap(0).numCachedPartitions === 1)
assert(storageListener._rddInfoMap(0).isCached)
assert(!storageListener._rddInfoMap(1).isCached)
assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
@@ -175,8 +171,8 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
val taskMetrics0 = new TaskMetrics
val taskMetrics1 = new TaskMetrics
- val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L))
- val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
+ val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L))
+ val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L))
taskMetrics0.updatedBlocks = Some(Seq(block0))
taskMetrics1.updatedBlocks = Some(Seq(block1))
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6566400e63..068e8397c8 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -801,7 +801,7 @@ class JsonProtocolSuite extends SparkFunSuite {
}
// Make at most 6 blocks
t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
- (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i))
+ (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i))
}.toSeq)
t
}
@@ -867,14 +867,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 201,
| "Number of Cached Partitions": 301,
| "Memory Size": 401,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 501
| }
| ],
@@ -1063,12 +1061,10 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
@@ -1149,12 +1145,10 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
@@ -1235,12 +1229,10 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
@@ -1270,14 +1262,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 200,
| "Number of Cached Partitions": 300,
| "Memory Size": 400,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 500
| }
| ],
@@ -1314,14 +1304,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 400,
| "Number of Cached Partitions": 600,
| "Memory Size": 800,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 1000
| },
| {
@@ -1332,14 +1320,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 401,
| "Number of Cached Partitions": 601,
| "Memory Size": 801,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 1001
| }
| ],
@@ -1376,14 +1362,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 600,
| "Number of Cached Partitions": 900,
| "Memory Size": 1200,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 1500
| },
| {
@@ -1394,14 +1378,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 601,
| "Number of Cached Partitions": 901,
| "Memory Size": 1201,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 1501
| },
| {
@@ -1412,14 +1394,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 602,
| "Number of Cached Partitions": 902,
| "Memory Size": 1202,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 1502
| }
| ],
@@ -1456,14 +1436,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 800,
| "Number of Cached Partitions": 1200,
| "Memory Size": 1600,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 2000
| },
| {
@@ -1474,14 +1452,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 801,
| "Number of Cached Partitions": 1201,
| "Memory Size": 1601,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 2001
| },
| {
@@ -1492,14 +1468,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 802,
| "Number of Cached Partitions": 1202,
| "Memory Size": 1602,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 2002
| },
| {
@@ -1510,14 +1484,12 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": true,
| "Replication": 1
| },
| "Number of Partitions": 803,
| "Number of Cached Partitions": 1203,
| "Memory Size": 1603,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 2003
| }
| ],
@@ -1723,12 +1695,10 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
- | "Use ExternalBlockStore": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
- | "ExternalBlockStore Size": 0,
| "Disk Size": 0
| }
| }
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index fb2e91e1ee..0760529b57 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -35,7 +35,7 @@ commons-configuration-1.6.jar
commons-dbcp-1.4.jar
commons-digester-1.8.jar
commons-httpclient-3.1.jar
-commons-io-2.4.jar
+commons-io-2.1.jar
commons-lang-2.6.jar
commons-lang3-3.3.2.jar
commons-logging-1.1.3.jar
@@ -179,10 +179,6 @@ stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
super-csv-2.2.0.jar
-tachyon-client-0.8.2.jar
-tachyon-underfs-hdfs-0.8.2.jar
-tachyon-underfs-local-0.8.2.jar
-tachyon-underfs-s3-0.8.2.jar
uncommons-maths-1.2.2a.jar
univocity-parsers-1.5.6.jar
unused-1.0.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 59e4d4f839..191f2a0e4e 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -170,10 +170,6 @@ stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
super-csv-2.2.0.jar
-tachyon-client-0.8.2.jar
-tachyon-underfs-hdfs-0.8.2.jar
-tachyon-underfs-local-0.8.2.jar
-tachyon-underfs-s3-0.8.2.jar
uncommons-maths-1.2.2a.jar
univocity-parsers-1.5.6.jar
unused-1.0.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index e4395c872c..9134e997c8 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -171,10 +171,6 @@ stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
super-csv-2.2.0.jar
-tachyon-client-0.8.2.jar
-tachyon-underfs-hdfs-0.8.2.jar
-tachyon-underfs-local-0.8.2.jar
-tachyon-underfs-s3-0.8.2.jar
uncommons-maths-1.2.2a.jar
univocity-parsers-1.5.6.jar
unused-1.0.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 89fd15da7d..8c45832873 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -177,10 +177,6 @@ stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
super-csv-2.2.0.jar
-tachyon-client-0.8.2.jar
-tachyon-underfs-hdfs-0.8.2.jar
-tachyon-underfs-local-0.8.2.jar
-tachyon-underfs-s3-0.8.2.jar
uncommons-maths-1.2.2a.jar
univocity-parsers-1.5.6.jar
unused-1.0.0.jar
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
deleted file mode 100644
index 8b739c9d7c..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples
-
-import java.util.Random
-
-import scala.math.exp
-
-import breeze.linalg.{DenseVector, Vector}
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark._
-import org.apache.spark.storage.StorageLevel
-
-/**
- * Logistic regression based classification.
- * This example uses Tachyon to persist rdds during computation.
- *
- * This is an example implementation for learning how to use Spark. For more conventional use,
- * please refer to either org.apache.spark.mllib.classification.LogisticRegressionWithSGD or
- * org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS based on your needs.
- */
-object SparkTachyonHdfsLR {
- val D = 10 // Numer of dimensions
- val rand = new Random(42)
-
- def showWarning() {
- System.err.println(
- """WARN: This is a naive implementation of Logistic Regression and is given as an example!
- |Please use either org.apache.spark.mllib.classification.LogisticRegressionWithSGD or
- |org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
- |for more conventional use.
- """.stripMargin)
- }
-
- case class DataPoint(x: Vector[Double], y: Double)
-
- def parsePoint(line: String): DataPoint = {
- val tok = new java.util.StringTokenizer(line, " ")
- var y = tok.nextToken.toDouble
- var x = new Array[Double](D)
- var i = 0
- while (i < D) {
- x(i) = tok.nextToken.toDouble; i += 1
- }
- DataPoint(new DenseVector(x), y)
- }
-
- def main(args: Array[String]) {
-
- showWarning()
-
- val inputPath = args(0)
- val sparkConf = new SparkConf().setAppName("SparkTachyonHdfsLR")
- val conf = new Configuration()
- val sc = new SparkContext(sparkConf)
- val lines = sc.textFile(inputPath)
- val points = lines.map(parsePoint).persist(StorageLevel.OFF_HEAP)
- val ITERATIONS = args(1).toInt
-
- // Initialize w to a random value
- var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
- println("Initial w: " + w)
-
- for (i <- 1 to ITERATIONS) {
- println("On iteration " + i)
- val gradient = points.map { p =>
- p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
- }.reduce(_ + _)
- w -= gradient
- }
-
- println("Final w: " + w)
- sc.stop()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
deleted file mode 100644
index e46ac655be..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples
-
-import scala.math.random
-
-import org.apache.spark._
-import org.apache.spark.storage.StorageLevel
-
-/**
- * Computes an approximation to pi
- * This example uses Tachyon to persist rdds during computation.
- */
-object SparkTachyonPi {
- def main(args: Array[String]) {
- val sparkConf = new SparkConf().setAppName("SparkTachyonPi")
- val spark = new SparkContext(sparkConf)
-
- val slices = if (args.length > 0) args(0).toInt else 2
- val n = 100000 * slices
-
- val rdd = spark.parallelize(1 to n, slices)
- rdd.persist(StorageLevel.OFF_HEAP)
- val count = rdd.map { i =>
- val x = random * 2 - 1
- val y = random * 2 - 1
- if (x * x + y * y < 1) 1 else 0
- }.reduce(_ + _)
- println("Pi is roughly " + 4.0 * count / n)
-
- spark.stop()
- }
-}
-// scalastyle:on println
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4206d1fada..ccd3c34bb5 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -120,7 +120,11 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.toArray"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.defaultMinSplits"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.clearJars"),
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.clearFiles")
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.clearFiles"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.externalBlockStoreFolderName"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockManager"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore")
) ++
// SPARK-12665 Remove deprecated and unused classes
Seq(