aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-21 17:19:39 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-21 17:19:39 -0700
commitb5f1ab701a167a728bb006e01b392b203da84391 (patch)
treeaf002e40214b1029e7d6f99b7b61389dbdac6895
parentb3e5af62a1c9e11556c4721164e6539d7ecce8e7 (diff)
downloadspark-b5f1ab701a167a728bb006e01b392b203da84391.tar.gz
spark-b5f1ab701a167a728bb006e01b392b203da84391.tar.bz2
spark-b5f1ab701a167a728bb006e01b392b203da84391.zip
[SPARK-13990] Automatically pick serializer when caching RDDs
Building on the `SerializerManager` introduced in SPARK-13926/ #11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks. When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager. There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11801 from JoshRosen/pick-best-serializer-for-caching.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/BlockDataManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/network/BlockTransferService.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala146
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala99
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala49
-rw-r--r--project/MimaExcludes.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala8
16 files changed, 226 insertions, 155 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 459fab88ce..e2c47ceda2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -331,7 +331,7 @@ object SparkEnv extends Logging {
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
- serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
+ serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index cc5e851c29..8f83668d79 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -17,6 +17,8 @@
package org.apache.spark.network
+import scala.reflect.ClassTag
+
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -35,7 +37,11 @@ trait BlockDataManager {
* Returns true if the block was stored and false if the put operation failed or the block
* already existed.
*/
- def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean
+ def putBlockData(
+ blockId: BlockId,
+ data: ManagedBuffer,
+ level: StorageLevel,
+ classTag: ClassTag[_]): Boolean
/**
* Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 2de0f2033f..e43e3a2de2 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -76,7 +77,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
- level: StorageLevel): Future[Unit]
+ level: StorageLevel,
+ classTag: ClassTag[_]): Future[Unit]
/**
* A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
@@ -114,7 +116,9 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
- level: StorageLevel): Unit = {
- Await.result(uploadBlock(hostname, port, execId, blockId, blockData, level), Duration.Inf)
+ level: StorageLevel,
+ classTag: ClassTag[_]): Unit = {
+ val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
+ Await.result(future, Duration.Inf)
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index c1dbca5db2..2ed8a00df7 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -20,6 +20,8 @@ package org.apache.spark.network.netty
import java.nio.ByteBuffer
import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
import org.apache.spark.network.BlockDataManager
@@ -61,12 +63,16 @@ class NettyBlockRpcServer(
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
case uploadBlock: UploadBlock =>
- // StorageLevel is serialized as bytes using our JavaSerializer.
- val level: StorageLevel =
- serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
+ // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
+ val (level: StorageLevel, classTag: ClassTag[_]) = {
+ serializer
+ .newInstance()
+ .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
+ .asInstanceOf[(StorageLevel, ClassTag[_])]
+ }
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
- blockManager.putBlockData(blockId, data, level)
+ blockManager.putBlockData(blockId, data, level, classTag)
responseContext.onSuccess(ByteBuffer.allocate(0))
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index f588a28eed..5f3d4532dd 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
+import scala.reflect.ClassTag
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.network._
@@ -118,18 +119,19 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
- level: StorageLevel): Future[Unit] = {
+ level: StorageLevel,
+ classTag: ClassTag[_]): Future[Unit] = {
val result = Promise[Unit]()
val client = clientFactory.createClient(hostname, port)
- // StorageLevel is serialized as bytes using our JavaSerializer. Everything else is encoded
- // using our binary protocol.
- val levelBytes = JavaUtils.bufferToArray(serializer.newInstance().serialize(level))
+ // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
+ // Everything else is encoded using our binary protocol.
+ val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
// Convert or copy nio buffer into array in order to serialize it.
val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
- client.sendRpc(new UploadBlock(appId, execId, blockId.toString, levelBytes, array).toByteBuffer,
+ client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,
new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId")
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 8a577c83e1..f96551c793 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -326,7 +326,7 @@ abstract class RDD[T: ClassTag](
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// This method is called on executors, so we need call SparkEnv.get instead of sc.env.
- SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => {
+ SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 46fab7a899..94d11c5be5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -21,6 +21,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.reflect.ClassTag
import com.google.common.collect.ConcurrentHashMultiset
@@ -37,10 +38,14 @@ import org.apache.spark.internal.Logging
* @param level the block's storage level. This is the requested persistence level, not the
* effective storage level of the block (i.e. if this is MEMORY_AND_DISK, then this
* does not imply that the block is actually resident in memory).
+ * @param classTag the block's [[ClassTag]], used to select the serializer
* @param tellMaster whether state changes for this block should be reported to the master. This
* is true for most blocks, but is false for broadcast blocks.
*/
-private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
+private[storage] class BlockInfo(
+ val level: StorageLevel,
+ val classTag: ClassTag[_],
+ val tellMaster: Boolean) {
/**
* The size of the block (in bytes)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index aa2561d8c3..83f8c5c37d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
+import scala.reflect.ClassTag
import scala.util.Random
import scala.util.control.NonFatal
@@ -37,7 +38,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.serializer.{Serializer, SerializerInstance}
+import org.apache.spark.serializer.{Serializer, SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
import org.apache.spark.util._
@@ -59,7 +60,7 @@ private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
- defaultSerializer: Serializer,
+ serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
@@ -294,8 +295,12 @@ private[spark] class BlockManager(
/**
* Put the block locally, using the given storage level.
*/
- override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
- putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)
+ override def putBlockData(
+ blockId: BlockId,
+ data: ManagedBuffer,
+ level: StorageLevel,
+ classTag: ClassTag[_]): Boolean = {
+ putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
}
/**
@@ -417,7 +422,7 @@ private[spark] class BlockManager(
val iter: Iterator[Any] = if (level.deserialized) {
memoryStore.getValues(blockId).get
} else {
- dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+ dataDeserialize(blockId, memoryStore.getBytes(blockId).get)(info.classTag)
}
val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
@@ -425,10 +430,11 @@ private[spark] class BlockManager(
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
- val diskValues = dataDeserialize(blockId, diskBytes)
+ val diskValues = dataDeserialize(blockId, diskBytes)(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
- dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes))
+ val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
+ dataDeserialize(blockId, bytes)(info.classTag)
}
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
@@ -502,7 +508,7 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
- def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+ private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
getRemoteBytes(blockId).map { data =>
new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.size)
}
@@ -633,12 +639,13 @@ private[spark] class BlockManager(
* @return either a BlockResult if the block was successfully cached, or an iterator if the block
* could not be cached.
*/
- def getOrElseUpdate(
+ def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
- makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
+ classTag: ClassTag[T],
+ makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
// Initially we hold no locks on this block.
- doPutIterator(blockId, makeIterator, level, keepReadLock = true) match {
+ doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
// doPut() didn't hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
@@ -664,13 +671,13 @@ private[spark] class BlockManager(
/**
* @return true if the block was stored or false if an error occurred.
*/
- def putIterator(
+ def putIterator[T: ClassTag](
blockId: BlockId,
- values: Iterator[Any],
+ values: Iterator[T],
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(values != null, "Values is null")
- doPutIterator(blockId, () => values, level, tellMaster) match {
+ doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match {
case None =>
true
case Some(iter) =>
@@ -703,13 +710,13 @@ private[spark] class BlockManager(
*
* @return true if the block was stored or false if an error occurred.
*/
- def putBytes(
+ def putBytes[T: ClassTag](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(bytes != null, "Bytes is null")
- doPutBytes(blockId, bytes, level, tellMaster)
+ doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster)
}
/**
@@ -723,13 +730,14 @@ private[spark] class BlockManager(
* returns.
* @return true if the block was already present or if the put succeeded, false otherwise.
*/
- private def doPutBytes(
+ private def doPutBytes[T](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
+ classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean = {
- doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+ doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeMs = System.currentTimeMillis
// Since we're storing bytes, initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
@@ -737,7 +745,7 @@ private[spark] class BlockManager(
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
- replicate(blockId, bytes, level)
+ replicate(blockId, bytes, level, classTag)
}(futureExecutionContext)
} else {
null
@@ -749,8 +757,8 @@ private[spark] class BlockManager(
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
- val values = dataDeserialize(blockId, bytes)
- memoryStore.putIterator(blockId, values, level) match {
+ val values = dataDeserialize(blockId, bytes)(classTag)
+ memoryStore.putIterator(blockId, values, level, classTag) match {
case Right(_) => true
case Left(iter) =>
// If putting deserialized values in memory failed, we will put the bytes directly to
@@ -769,14 +777,14 @@ private[spark] class BlockManager(
diskStore.putBytes(blockId, bytes)
}
- val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// tell the master about it.
- putBlockInfo.size = size
+ info.size = size
if (tellMaster) {
- reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+ reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
@@ -804,6 +812,7 @@ private[spark] class BlockManager(
private def doPut[T](
blockId: BlockId,
level: StorageLevel,
+ classTag: ClassTag[_],
tellMaster: Boolean,
keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
@@ -811,7 +820,7 @@ private[spark] class BlockManager(
require(level != null && level.isValid, "StorageLevel is null or invalid")
val putBlockInfo = {
- val newInfo = new BlockInfo(level, tellMaster)
+ val newInfo = new BlockInfo(level, classTag, tellMaster)
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
newInfo
} else {
@@ -864,21 +873,22 @@ private[spark] class BlockManager(
* @return None if the block was already present or if the put succeeded, or Some(iterator)
* if the put failed.
*/
- private def doPutIterator(
+ private def doPutIterator[T](
blockId: BlockId,
- iterator: () => Iterator[Any],
+ iterator: () => Iterator[T],
level: StorageLevel,
+ classTag: ClassTag[T],
tellMaster: Boolean = true,
- keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = {
- doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+ keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
+ doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeMs = System.currentTimeMillis
- var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] = None
+ var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
// Size of the block in bytes
var size = 0L
if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
- memoryStore.putIterator(blockId, iterator(), level) match {
+ memoryStore.putIterator(blockId, iterator(), level, classTag) match {
case Right(s) =>
size = s
case Left(iter) =>
@@ -886,7 +896,7 @@ private[spark] class BlockManager(
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { fileOutputStream =>
- dataSerializeStream(blockId, fileOutputStream, iter)
+ dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
}
size = diskStore.getSize(blockId)
} else {
@@ -895,19 +905,19 @@ private[spark] class BlockManager(
}
} else if (level.useDisk) {
diskStore.put(blockId) { fileOutputStream =>
- dataSerializeStream(blockId, fileOutputStream, iterator())
+ dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
}
size = diskStore.getSize(blockId)
}
- val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// tell the master about it.
- putBlockInfo.size = size
+ info.size = size
if (tellMaster) {
- reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+ reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
@@ -915,9 +925,9 @@ private[spark] class BlockManager(
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
- val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo)
+ val bytesToReplicate = doGetLocalBytes(blockId, info)
try {
- replicate(blockId, bytesToReplicate, level)
+ replicate(blockId, bytesToReplicate, level, classTag)
} finally {
bytesToReplicate.dispose()
}
@@ -978,12 +988,13 @@ private[spark] class BlockManager(
* @return a copy of the iterator. The original iterator passed this method should no longer
* be used after this method returns.
*/
- private def maybeCacheDiskValuesInMemory(
+ private def maybeCacheDiskValuesInMemory[T](
blockInfo: BlockInfo,
blockId: BlockId,
level: StorageLevel,
- diskIterator: Iterator[Any]): Iterator[Any] = {
+ diskIterator: Iterator[T]): Iterator[T] = {
require(level.deserialized)
+ val classTag = blockInfo.classTag.asInstanceOf[ClassTag[T]]
if (level.useMemory) {
// Synchronize on blockInfo to guard against a race condition where two readers both try to
// put values read from disk into the MemoryStore.
@@ -992,7 +1003,7 @@ private[spark] class BlockManager(
// Note: if we had a means to discard the disk iterator, we would do that here.
memoryStore.getValues(blockId).get
} else {
- memoryStore.putIterator(blockId, diskIterator, level) match {
+ memoryStore.putIterator(blockId, diskIterator, level, classTag) match {
case Left(iter) =>
// The memory store put() failed, so it returned the iterator back to us:
iter
@@ -1001,7 +1012,7 @@ private[spark] class BlockManager(
memoryStore.getValues(blockId).get
}
}
- }
+ }.asInstanceOf[Iterator[T]]
} else {
diskIterator
}
@@ -1027,7 +1038,11 @@ private[spark] class BlockManager(
* Replicate block to another node. Not that this is a blocking call that returns after
* the block has been replicated.
*/
- private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel): Unit = {
+ private def replicate(
+ blockId: BlockId,
+ data: ChunkedByteBuffer,
+ level: StorageLevel,
+ classTag: ClassTag[_]): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
@@ -1087,7 +1102,8 @@ private[spark] class BlockManager(
peer.executorId,
blockId,
new NettyManagedBuffer(data.toNetty),
- tLevel)
+ tLevel,
+ classTag)
logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
peersReplicatedTo += peer
@@ -1132,9 +1148,9 @@ private[spark] class BlockManager(
* @return true if the block was stored or false if the block was already stored or an
* error occurred.
*/
- def putSingle(
+ def putSingle[T: ClassTag](
blockId: BlockId,
- value: Any,
+ value: T,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
putIterator(blockId, Iterator(value), level, tellMaster)
@@ -1151,9 +1167,9 @@ private[spark] class BlockManager(
*
* @return the block's new effective StorageLevel.
*/
- def dropFromMemory(
+ private[storage] def dropFromMemory[T: ClassTag](
blockId: BlockId,
- data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = {
+ data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
var blockIsUpdated = false
@@ -1165,7 +1181,10 @@ private[spark] class BlockManager(
data() match {
case Left(elements) =>
diskStore.put(blockId) { fileOutputStream =>
- dataSerializeStream(blockId, fileOutputStream, elements.toIterator)
+ dataSerializeStream(
+ blockId,
+ fileOutputStream,
+ elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
}
case Right(bytes) =>
diskStore.putBytes(blockId, bytes)
@@ -1271,17 +1290,17 @@ private[spark] class BlockManager(
}
/** Serializes into a stream. */
- def dataSerializeStream(
+ def dataSerializeStream[T: ClassTag](
blockId: BlockId,
outputStream: OutputStream,
- values: Iterator[Any]): Unit = {
+ values: Iterator[T]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
- val ser = defaultSerializer.newInstance()
+ val ser = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}
/** Serializes into a chunked byte buffer. */
- def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = {
+ def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = {
val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
@@ -1291,29 +1310,22 @@ private[spark] class BlockManager(
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
- bytes.rewind()
- dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
- }
-
- /**
- * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
- * the iterator is reached.
- */
- def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[Any] = {
- dataDeserializeStream(blockId, bytes.toInputStream(dispose = true))
+ def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[T] = {
+ dataDeserializeStream[T](blockId, bytes.toInputStream(dispose = true))
}
/**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = {
+ def dataDeserializeStream[T: ClassTag](
+ blockId: BlockId,
+ inputStream: InputStream): Iterator[T] = {
val stream = new BufferedInputStream(inputStream)
- defaultSerializer
+ serializerManager.getSerializer(implicitly[ClassTag[T]])
.newInstance()
.deserializeStream(wrapForCompression(blockId, stream))
- .asIterator
+ .asIterator.asInstanceOf[Iterator[T]]
}
def stop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 94171324f8..d370ee912a 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -21,6 +21,7 @@ import java.util.LinkedHashMap
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
@@ -30,11 +31,18 @@ import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
import org.apache.spark.util.io.ChunkedByteBuffer
-private sealed trait MemoryEntry {
- val size: Long
+private sealed trait MemoryEntry[T] {
+ def size: Long
+ def classTag: ClassTag[T]
}
-private case class DeserializedMemoryEntry(value: Array[Any], size: Long) extends MemoryEntry
-private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: Long) extends MemoryEntry
+private case class DeserializedMemoryEntry[T](
+ value: Array[T],
+ size: Long,
+ classTag: ClassTag[T]) extends MemoryEntry[T]
+private case class SerializedMemoryEntry[T](
+ buffer: ChunkedByteBuffer,
+ size: Long,
+ classTag: ClassTag[T]) extends MemoryEntry[T]
/**
* Stores blocks in memory, either as Arrays of deserialized Java objects or as
@@ -49,7 +57,7 @@ private[spark] class MemoryStore(
// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
- private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
+ private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `memoryManager`
@@ -95,13 +103,16 @@ private[spark] class MemoryStore(
*
* @return true if the put() succeeded, false otherwise.
*/
- def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): Boolean = {
+ def putBytes[T: ClassTag](
+ blockId: BlockId,
+ size: Long,
+ _bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
if (memoryManager.acquireStorageMemory(blockId, size)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
- val entry = new SerializedMemoryEntry(bytes, size)
+ val entry = new SerializedMemoryEntry[T](bytes, size, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
@@ -129,10 +140,11 @@ private[spark] class MemoryStore(
* iterator or call `close()` on it in order to free the storage memory consumed by the
* partially-unrolled block.
*/
- private[storage] def putIterator(
+ private[storage] def putIterator[T](
blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = {
+ values: Iterator[T],
+ level: StorageLevel,
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
@@ -151,7 +163,7 @@ private[spark] class MemoryStore(
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
// Underlying vector for unrolling the block
- var vector = new SizeTrackingVector[Any]
+ var vector = new SizeTrackingVector[T]()(classTag)
// Request enough memory to begin unrolling
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
@@ -187,10 +199,10 @@ private[spark] class MemoryStore(
val arrayValues = vector.toArray
vector = null
val entry = if (level.deserialized) {
- new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues))
+ new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
} else {
- val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
- new SerializedMemoryEntry(bytes, bytes.size)
+ val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)(classTag)
+ new SerializedMemoryEntry[T](bytes, bytes.size, classTag)
}
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
@@ -248,19 +260,21 @@ private[spark] class MemoryStore(
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
- case e: DeserializedMemoryEntry =>
+ case e: DeserializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
- case SerializedMemoryEntry(bytes, _) => Some(bytes)
+ case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
}
}
- def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+ def getValues(blockId: BlockId): Option[Iterator[_]] = {
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
- case e: SerializedMemoryEntry =>
+ case e: SerializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getValues on deserialized blocks")
- case DeserializedMemoryEntry(values, _) => Some(values.iterator)
+ case DeserializedMemoryEntry(values, _, _) =>
+ val x = Some(values)
+ x.map(_.iterator)
}
}
@@ -333,6 +347,24 @@ private[spark] class MemoryStore(
}
}
+ def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
+ val data = entry match {
+ case DeserializedMemoryEntry(values, _, _) => Left(values)
+ case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
+ }
+ val newEffectiveStorageLevel =
+ blockManager.dropFromMemory(blockId, () => data)(entry.classTag)
+ if (newEffectiveStorageLevel.isValid) {
+ // The block is still present in at least one store, so release the lock
+ // but don't delete the block info
+ blockManager.releaseLock(blockId)
+ } else {
+ // The block isn't present in any store, so delete the block info so that the
+ // block can be stored again
+ blockManager.blockInfoManager.removeBlock(blockId)
+ }
+ }
+
if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
@@ -341,20 +373,7 @@ private[spark] class MemoryStore(
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
- val data = entry match {
- case DeserializedMemoryEntry(values, _) => Left(values)
- case SerializedMemoryEntry(buffer, _) => Right(buffer)
- }
- val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
- if (newEffectiveStorageLevel.isValid) {
- // The block is still present in at least one store, so release the lock
- // but don't delete the block info
- blockManager.releaseLock(blockId)
- } else {
- // The block isn't present in any store, so delete the block info so that the
- // block can be stored again
- blockManager.blockInfoManager.removeBlock(blockId)
- }
+ dropBlock(blockId, entry)
}
}
freedMemory
@@ -470,16 +489,16 @@ private[spark] class MemoryStore(
* @param unrolled an iterator for the partially-unrolled values.
* @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]].
*/
-private[storage] class PartiallyUnrolledIterator(
+private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
unrollMemory: Long,
- unrolled: Iterator[Any],
- rest: Iterator[Any])
- extends Iterator[Any] {
+ unrolled: Iterator[T],
+ rest: Iterator[T])
+ extends Iterator[T] {
private[this] var unrolledIteratorIsConsumed: Boolean = false
- private[this] var iter: Iterator[Any] = {
- val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, {
+ private[this] var iter: Iterator[T] = {
+ val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
unrolledIteratorIsConsumed = true
memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
})
@@ -487,7 +506,7 @@ private[storage] class PartiallyUnrolledIterator(
}
override def hasNext: Boolean = iter.hasNext
- override def next(): Any = iter.next()
+ override def next(): T = iter.next()
/**
* Called to dispose of this iterator and free its memory.
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 1c3f2bc315..2732cd6749 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.Matchers
import org.scalatest.time.{Millis, Span}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
+import org.apache.spark.util.io.ChunkedByteBuffer
class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
@@ -196,8 +197,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
blockManager.master.getLocations(blockId).foreach { cmId =>
val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
blockId.toString)
- val deserialized = blockManager.dataDeserialize(blockId, bytes.nioByteBuffer())
- .asInstanceOf[Iterator[Int]].toList
+ val deserialized = blockManager.dataDeserialize[Int](blockId,
+ new ChunkedByteBuffer(bytes.nioByteBuffer())).toList
assert(deserialized === (1 to 100).toList)
}
}
@@ -222,7 +223,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val numPartitions = 10
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
- .set("spark.testing.memory", (size * numPartitions).toString)
+ .set("spark.testing.memory", size.toString)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index fe83fc722a..7ee76aa4c6 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.implicitConversions
+import scala.reflect.ClassTag
import org.scalatest.BeforeAndAfterEach
import org.scalatest.time.SpanSugar._
@@ -52,7 +53,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
}
private def newBlockInfo(): BlockInfo = {
- new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)
+ new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)
}
private def withTaskId[T](taskAttemptId: Long)(block: => T): T = {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b78a3648cd..98e8450fa1 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage.StorageLevel._
@@ -62,7 +62,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val store = new BlockManager(name, rpcEnv, master, serializer, conf,
+ val serializerManager = new SerializerManager(serializer, conf)
+ val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(store.memoryStore)
store.initialize("app-id")
@@ -262,7 +263,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000, numCores = 1)
- val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer, conf,
+ val serializerManager = new SerializerManager(serializer, conf)
+ val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
memManager.setMemoryStore(failableStore.memoryStore)
failableStore.initialize("app-id")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index edf5cd35e4..9419dfaa00 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -24,6 +24,7 @@ import scala.concurrent.duration._
import scala.concurrent.Future
import scala.language.implicitConversions
import scala.language.postfixOps
+import scala.reflect.ClassTag
import org.mockito.{Matchers => mc}
import org.mockito.Mockito.{mock, times, verify, when}
@@ -40,7 +41,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
import org.apache.spark.util._
@@ -77,7 +78,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val transfer = transferService
.getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1))
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
+ val serializerManager = new SerializerManager(serializer, conf)
+ val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)
blockManager.initialize("app-id")
@@ -821,8 +823,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
maxOnHeapExecutionMemory = Long.MaxValue,
maxStorageMemory = 1200,
numCores = 1)
+ val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
- new JavaSerializer(conf), conf, memoryManager, mapOutputTracker,
+ serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, 0)
memoryManager.setMemoryStore(store.memoryStore)
@@ -1074,7 +1077,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll with all the space in the world. This should succeed.
- var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ var putResult =
+ memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
@@ -1085,7 +1089,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll with not enough space. This should succeed after kicking out someBlock1.
assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY))
assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY))
- putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ putResult =
+ memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
assert(memoryStore.contains("someBlock2"))
@@ -1099,7 +1104,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the mean time, however, we kicked out someBlock2 before giving up.
assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY))
- putResult = memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY)
+ putResult =
+ memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
assert(!memoryStore.contains("someBlock2"))
assert(putResult.isLeft)
@@ -1121,8 +1127,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll with plenty of space. This should succeed and cache both blocks.
- val result1 = memoryStore.putIterator("b1", smallIterator, memOnly)
- val result2 = memoryStore.putIterator("b2", smallIterator, memOnly)
+ val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any)
+ val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any)
assert(memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(result1.isRight) // unroll was successful
@@ -1137,7 +1143,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b2", smallIterator, memOnly)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
- val result3 = memoryStore.putIterator("b3", smallIterator, memOnly)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -1147,7 +1153,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b3", smallIterator, memOnly)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
- val result4 = memoryStore.putIterator("b4", bigIterator, memOnly)
+ val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, ClassTag.Any)
assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
@@ -1175,7 +1181,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
- val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -1191,7 +1197,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// the block may be stored to disk. During the unrolling process, block "b2" should be kicked
// out, so the memory store should contain only b3, while the disk store should contain
// b1, b2 and b4.
- val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
+ val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any)
assert(result4.isLeft)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
@@ -1211,28 +1217,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// All unroll memory used is released because putIterator did not return an iterator
- assert(memoryStore.putIterator("b1", smallIterator, memOnly).isRight)
+ assert(memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- assert(memoryStore.putIterator("b2", smallIterator, memOnly).isRight)
+ assert(memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll memory is not released because putIterator returned an iterator
// that still depends on the underlying vector used in the process
- assert(memoryStore.putIterator("b3", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB3 > 0)
// The unroll memory owned by this thread builds on top of its value after the previous unrolls
- assert(memoryStore.putIterator("b4", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b4", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
// ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- assert(memoryStore.putIterator("b5", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b5", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- assert(memoryStore.putIterator("b6", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b6", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- assert(memoryStore.putIterator("b7", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b7", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
@@ -1244,7 +1250,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
store.blockInfoManager.lockNewBlockForWriting(
- blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
+ blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false))
memoryStore.putBytes(blockId, 13000, () => {
fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
})
@@ -1340,7 +1346,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
port: Int, execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
- level: StorageLevel): Future[Unit] = {
+ level: StorageLevel,
+ classTag: ClassTag[_]): Future[Unit] = {
import scala.concurrent.ExecutionContext.Implicits.global
Future {}
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 13caa54d06..68e9c50d60 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -569,6 +569,9 @@ object MimaExcludes {
if missing.map(_.fullName).sameElements(Seq("org.apache.spark.Logging")) => false
case _ => true
}
+ ) ++ Seq(
+ // [SPARK-13990] Automatically pick serializer when caching RDDs
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock")
)
case v if v.startsWith("1.6") =>
Seq(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 8625882b04..ace67a639c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -161,7 +161,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
- blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
+ blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead))
+ .asInstanceOf[Iterator[T]]
}
if (partition.isBlockIdValid) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 76f67ed601..122ca0627f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -34,12 +34,13 @@ import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.util._
import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.util.io.ChunkedByteBuffer
class ReceivedBlockHandlerSuite
extends SparkFunSuite
@@ -155,7 +156,7 @@ class ReceivedBlockHandlerSuite
val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
val bytes = reader.read(fileSegment)
reader.close()
- blockManager.dataDeserialize(generateBlockId(), bytes).toList
+ blockManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList
}
loggedData shouldEqual data
}
@@ -264,7 +265,8 @@ class ReceivedBlockHandlerSuite
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf,
+ val serializerManager = new SerializerManager(serializer, conf)
+ val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)
blockManager.initialize("app-id")