aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/BlockDataManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/network/BlockTransferService.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala146
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala99
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala49
-rw-r--r--project/MimaExcludes.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala8
16 files changed, 226 insertions, 155 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 459fab88ce..e2c47ceda2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -331,7 +331,7 @@ object SparkEnv extends Logging {
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
- serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
+ serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index cc5e851c29..8f83668d79 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -17,6 +17,8 @@
package org.apache.spark.network
+import scala.reflect.ClassTag
+
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -35,7 +37,11 @@ trait BlockDataManager {
* Returns true if the block was stored and false if the put operation failed or the block
* already existed.
*/
- def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean
+ def putBlockData(
+ blockId: BlockId,
+ data: ManagedBuffer,
+ level: StorageLevel,
+ classTag: ClassTag[_]): Boolean
/**
* Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 2de0f2033f..e43e3a2de2 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -76,7 +77,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
- level: StorageLevel): Future[Unit]
+ level: StorageLevel,
+ classTag: ClassTag[_]): Future[Unit]
/**
* A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
@@ -114,7 +116,9 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
- level: StorageLevel): Unit = {
- Await.result(uploadBlock(hostname, port, execId, blockId, blockData, level), Duration.Inf)
+ level: StorageLevel,
+ classTag: ClassTag[_]): Unit = {
+ val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
+ Await.result(future, Duration.Inf)
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index c1dbca5db2..2ed8a00df7 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -20,6 +20,8 @@ package org.apache.spark.network.netty
import java.nio.ByteBuffer
import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
import org.apache.spark.network.BlockDataManager
@@ -61,12 +63,16 @@ class NettyBlockRpcServer(
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
case uploadBlock: UploadBlock =>
- // StorageLevel is serialized as bytes using our JavaSerializer.
- val level: StorageLevel =
- serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
+ // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
+ val (level: StorageLevel, classTag: ClassTag[_]) = {
+ serializer
+ .newInstance()
+ .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
+ .asInstanceOf[(StorageLevel, ClassTag[_])]
+ }
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
- blockManager.putBlockData(blockId, data, level)
+ blockManager.putBlockData(blockId, data, level, classTag)
responseContext.onSuccess(ByteBuffer.allocate(0))
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index f588a28eed..5f3d4532dd 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
+import scala.reflect.ClassTag
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.network._
@@ -118,18 +119,19 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
- level: StorageLevel): Future[Unit] = {
+ level: StorageLevel,
+ classTag: ClassTag[_]): Future[Unit] = {
val result = Promise[Unit]()
val client = clientFactory.createClient(hostname, port)
- // StorageLevel is serialized as bytes using our JavaSerializer. Everything else is encoded
- // using our binary protocol.
- val levelBytes = JavaUtils.bufferToArray(serializer.newInstance().serialize(level))
+ // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
+ // Everything else is encoded using our binary protocol.
+ val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
// Convert or copy nio buffer into array in order to serialize it.
val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
- client.sendRpc(new UploadBlock(appId, execId, blockId.toString, levelBytes, array).toByteBuffer,
+ client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,
new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId")
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 8a577c83e1..f96551c793 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -326,7 +326,7 @@ abstract class RDD[T: ClassTag](
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// This method is called on executors, so we need call SparkEnv.get instead of sc.env.
- SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => {
+ SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 46fab7a899..94d11c5be5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -21,6 +21,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.reflect.ClassTag
import com.google.common.collect.ConcurrentHashMultiset
@@ -37,10 +38,14 @@ import org.apache.spark.internal.Logging
* @param level the block's storage level. This is the requested persistence level, not the
* effective storage level of the block (i.e. if this is MEMORY_AND_DISK, then this
* does not imply that the block is actually resident in memory).
+ * @param classTag the block's [[ClassTag]], used to select the serializer
* @param tellMaster whether state changes for this block should be reported to the master. This
* is true for most blocks, but is false for broadcast blocks.
*/
-private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
+private[storage] class BlockInfo(
+ val level: StorageLevel,
+ val classTag: ClassTag[_],
+ val tellMaster: Boolean) {
/**
* The size of the block (in bytes)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index aa2561d8c3..83f8c5c37d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
+import scala.reflect.ClassTag
import scala.util.Random
import scala.util.control.NonFatal
@@ -37,7 +38,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.serializer.{Serializer, SerializerInstance}
+import org.apache.spark.serializer.{Serializer, SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
import org.apache.spark.util._
@@ -59,7 +60,7 @@ private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
- defaultSerializer: Serializer,
+ serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
@@ -294,8 +295,12 @@ private[spark] class BlockManager(
/**
* Put the block locally, using the given storage level.
*/
- override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
- putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)
+ override def putBlockData(
+ blockId: BlockId,
+ data: ManagedBuffer,
+ level: StorageLevel,
+ classTag: ClassTag[_]): Boolean = {
+ putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
}
/**
@@ -417,7 +422,7 @@ private[spark] class BlockManager(
val iter: Iterator[Any] = if (level.deserialized) {
memoryStore.getValues(blockId).get
} else {
- dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+ dataDeserialize(blockId, memoryStore.getBytes(blockId).get)(info.classTag)
}
val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
@@ -425,10 +430,11 @@ private[spark] class BlockManager(
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
- val diskValues = dataDeserialize(blockId, diskBytes)
+ val diskValues = dataDeserialize(blockId, diskBytes)(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
- dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes))
+ val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
+ dataDeserialize(blockId, bytes)(info.classTag)
}
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
@@ -502,7 +508,7 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
- def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+ private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
getRemoteBytes(blockId).map { data =>
new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.size)
}
@@ -633,12 +639,13 @@ private[spark] class BlockManager(
* @return either a BlockResult if the block was successfully cached, or an iterator if the block
* could not be cached.
*/
- def getOrElseUpdate(
+ def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
- makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
+ classTag: ClassTag[T],
+ makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
// Initially we hold no locks on this block.
- doPutIterator(blockId, makeIterator, level, keepReadLock = true) match {
+ doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
// doPut() didn't hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
@@ -664,13 +671,13 @@ private[spark] class BlockManager(
/**
* @return true if the block was stored or false if an error occurred.
*/
- def putIterator(
+ def putIterator[T: ClassTag](
blockId: BlockId,
- values: Iterator[Any],
+ values: Iterator[T],
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(values != null, "Values is null")
- doPutIterator(blockId, () => values, level, tellMaster) match {
+ doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match {
case None =>
true
case Some(iter) =>
@@ -703,13 +710,13 @@ private[spark] class BlockManager(
*
* @return true if the block was stored or false if an error occurred.
*/
- def putBytes(
+ def putBytes[T: ClassTag](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(bytes != null, "Bytes is null")
- doPutBytes(blockId, bytes, level, tellMaster)
+ doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster)
}
/**
@@ -723,13 +730,14 @@ private[spark] class BlockManager(
* returns.
* @return true if the block was already present or if the put succeeded, false otherwise.
*/
- private def doPutBytes(
+ private def doPutBytes[T](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
+ classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean = {
- doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+ doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeMs = System.currentTimeMillis
// Since we're storing bytes, initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
@@ -737,7 +745,7 @@ private[spark] class BlockManager(
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
- replicate(blockId, bytes, level)
+ replicate(blockId, bytes, level, classTag)
}(futureExecutionContext)
} else {
null
@@ -749,8 +757,8 @@ private[spark] class BlockManager(
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
- val values = dataDeserialize(blockId, bytes)
- memoryStore.putIterator(blockId, values, level) match {
+ val values = dataDeserialize(blockId, bytes)(classTag)
+ memoryStore.putIterator(blockId, values, level, classTag) match {
case Right(_) => true
case Left(iter) =>
// If putting deserialized values in memory failed, we will put the bytes directly to
@@ -769,14 +777,14 @@ private[spark] class BlockManager(
diskStore.putBytes(blockId, bytes)
}
- val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// tell the master about it.
- putBlockInfo.size = size
+ info.size = size
if (tellMaster) {
- reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+ reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
@@ -804,6 +812,7 @@ private[spark] class BlockManager(
private def doPut[T](
blockId: BlockId,
level: StorageLevel,
+ classTag: ClassTag[_],
tellMaster: Boolean,
keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
@@ -811,7 +820,7 @@ private[spark] class BlockManager(
require(level != null && level.isValid, "StorageLevel is null or invalid")
val putBlockInfo = {
- val newInfo = new BlockInfo(level, tellMaster)
+ val newInfo = new BlockInfo(level, classTag, tellMaster)
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
newInfo
} else {
@@ -864,21 +873,22 @@ private[spark] class BlockManager(
* @return None if the block was already present or if the put succeeded, or Some(iterator)
* if the put failed.
*/
- private def doPutIterator(
+ private def doPutIterator[T](
blockId: BlockId,
- iterator: () => Iterator[Any],
+ iterator: () => Iterator[T],
level: StorageLevel,
+ classTag: ClassTag[T],
tellMaster: Boolean = true,
- keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = {
- doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+ keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
+ doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeMs = System.currentTimeMillis
- var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] = None
+ var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
// Size of the block in bytes
var size = 0L
if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
- memoryStore.putIterator(blockId, iterator(), level) match {
+ memoryStore.putIterator(blockId, iterator(), level, classTag) match {
case Right(s) =>
size = s
case Left(iter) =>
@@ -886,7 +896,7 @@ private[spark] class BlockManager(
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { fileOutputStream =>
- dataSerializeStream(blockId, fileOutputStream, iter)
+ dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
}
size = diskStore.getSize(blockId)
} else {
@@ -895,19 +905,19 @@ private[spark] class BlockManager(
}
} else if (level.useDisk) {
diskStore.put(blockId) { fileOutputStream =>
- dataSerializeStream(blockId, fileOutputStream, iterator())
+ dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
}
size = diskStore.getSize(blockId)
}
- val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// tell the master about it.
- putBlockInfo.size = size
+ info.size = size
if (tellMaster) {
- reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+ reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
@@ -915,9 +925,9 @@ private[spark] class BlockManager(
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
- val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo)
+ val bytesToReplicate = doGetLocalBytes(blockId, info)
try {
- replicate(blockId, bytesToReplicate, level)
+ replicate(blockId, bytesToReplicate, level, classTag)
} finally {
bytesToReplicate.dispose()
}
@@ -978,12 +988,13 @@ private[spark] class BlockManager(
* @return a copy of the iterator. The original iterator passed this method should no longer
* be used after this method returns.
*/
- private def maybeCacheDiskValuesInMemory(
+ private def maybeCacheDiskValuesInMemory[T](
blockInfo: BlockInfo,
blockId: BlockId,
level: StorageLevel,
- diskIterator: Iterator[Any]): Iterator[Any] = {
+ diskIterator: Iterator[T]): Iterator[T] = {
require(level.deserialized)
+ val classTag = blockInfo.classTag.asInstanceOf[ClassTag[T]]
if (level.useMemory) {
// Synchronize on blockInfo to guard against a race condition where two readers both try to
// put values read from disk into the MemoryStore.
@@ -992,7 +1003,7 @@ private[spark] class BlockManager(
// Note: if we had a means to discard the disk iterator, we would do that here.
memoryStore.getValues(blockId).get
} else {
- memoryStore.putIterator(blockId, diskIterator, level) match {
+ memoryStore.putIterator(blockId, diskIterator, level, classTag) match {
case Left(iter) =>
// The memory store put() failed, so it returned the iterator back to us:
iter
@@ -1001,7 +1012,7 @@ private[spark] class BlockManager(
memoryStore.getValues(blockId).get
}
}
- }
+ }.asInstanceOf[Iterator[T]]
} else {
diskIterator
}
@@ -1027,7 +1038,11 @@ private[spark] class BlockManager(
* Replicate block to another node. Not that this is a blocking call that returns after
* the block has been replicated.
*/
- private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel): Unit = {
+ private def replicate(
+ blockId: BlockId,
+ data: ChunkedByteBuffer,
+ level: StorageLevel,
+ classTag: ClassTag[_]): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
@@ -1087,7 +1102,8 @@ private[spark] class BlockManager(
peer.executorId,
blockId,
new NettyManagedBuffer(data.toNetty),
- tLevel)
+ tLevel,
+ classTag)
logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
peersReplicatedTo += peer
@@ -1132,9 +1148,9 @@ private[spark] class BlockManager(
* @return true if the block was stored or false if the block was already stored or an
* error occurred.
*/
- def putSingle(
+ def putSingle[T: ClassTag](
blockId: BlockId,
- value: Any,
+ value: T,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
putIterator(blockId, Iterator(value), level, tellMaster)
@@ -1151,9 +1167,9 @@ private[spark] class BlockManager(
*
* @return the block's new effective StorageLevel.
*/
- def dropFromMemory(
+ private[storage] def dropFromMemory[T: ClassTag](
blockId: BlockId,
- data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = {
+ data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
var blockIsUpdated = false
@@ -1165,7 +1181,10 @@ private[spark] class BlockManager(
data() match {
case Left(elements) =>
diskStore.put(blockId) { fileOutputStream =>
- dataSerializeStream(blockId, fileOutputStream, elements.toIterator)
+ dataSerializeStream(
+ blockId,
+ fileOutputStream,
+ elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
}
case Right(bytes) =>
diskStore.putBytes(blockId, bytes)
@@ -1271,17 +1290,17 @@ private[spark] class BlockManager(
}
/** Serializes into a stream. */
- def dataSerializeStream(
+ def dataSerializeStream[T: ClassTag](
blockId: BlockId,
outputStream: OutputStream,
- values: Iterator[Any]): Unit = {
+ values: Iterator[T]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
- val ser = defaultSerializer.newInstance()
+ val ser = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}
/** Serializes into a chunked byte buffer. */
- def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = {
+ def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = {
val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
@@ -1291,29 +1310,22 @@ private[spark] class BlockManager(
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
- bytes.rewind()
- dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
- }
-
- /**
- * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
- * the iterator is reached.
- */
- def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[Any] = {
- dataDeserializeStream(blockId, bytes.toInputStream(dispose = true))
+ def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[T] = {
+ dataDeserializeStream[T](blockId, bytes.toInputStream(dispose = true))
}
/**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = {
+ def dataDeserializeStream[T: ClassTag](
+ blockId: BlockId,
+ inputStream: InputStream): Iterator[T] = {
val stream = new BufferedInputStream(inputStream)
- defaultSerializer
+ serializerManager.getSerializer(implicitly[ClassTag[T]])
.newInstance()
.deserializeStream(wrapForCompression(blockId, stream))
- .asIterator
+ .asIterator.asInstanceOf[Iterator[T]]
}
def stop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 94171324f8..d370ee912a 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -21,6 +21,7 @@ import java.util.LinkedHashMap
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
@@ -30,11 +31,18 @@ import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
import org.apache.spark.util.io.ChunkedByteBuffer
-private sealed trait MemoryEntry {
- val size: Long
+private sealed trait MemoryEntry[T] {
+ def size: Long
+ def classTag: ClassTag[T]
}
-private case class DeserializedMemoryEntry(value: Array[Any], size: Long) extends MemoryEntry
-private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: Long) extends MemoryEntry
+private case class DeserializedMemoryEntry[T](
+ value: Array[T],
+ size: Long,
+ classTag: ClassTag[T]) extends MemoryEntry[T]
+private case class SerializedMemoryEntry[T](
+ buffer: ChunkedByteBuffer,
+ size: Long,
+ classTag: ClassTag[T]) extends MemoryEntry[T]
/**
* Stores blocks in memory, either as Arrays of deserialized Java objects or as
@@ -49,7 +57,7 @@ private[spark] class MemoryStore(
// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
- private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
+ private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `memoryManager`
@@ -95,13 +103,16 @@ private[spark] class MemoryStore(
*
* @return true if the put() succeeded, false otherwise.
*/
- def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): Boolean = {
+ def putBytes[T: ClassTag](
+ blockId: BlockId,
+ size: Long,
+ _bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
if (memoryManager.acquireStorageMemory(blockId, size)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
- val entry = new SerializedMemoryEntry(bytes, size)
+ val entry = new SerializedMemoryEntry[T](bytes, size, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
@@ -129,10 +140,11 @@ private[spark] class MemoryStore(
* iterator or call `close()` on it in order to free the storage memory consumed by the
* partially-unrolled block.
*/
- private[storage] def putIterator(
+ private[storage] def putIterator[T](
blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = {
+ values: Iterator[T],
+ level: StorageLevel,
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
@@ -151,7 +163,7 @@ private[spark] class MemoryStore(
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
// Underlying vector for unrolling the block
- var vector = new SizeTrackingVector[Any]
+ var vector = new SizeTrackingVector[T]()(classTag)
// Request enough memory to begin unrolling
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
@@ -187,10 +199,10 @@ private[spark] class MemoryStore(
val arrayValues = vector.toArray
vector = null
val entry = if (level.deserialized) {
- new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues))
+ new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
} else {
- val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
- new SerializedMemoryEntry(bytes, bytes.size)
+ val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)(classTag)
+ new SerializedMemoryEntry[T](bytes, bytes.size, classTag)
}
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
@@ -248,19 +260,21 @@ private[spark] class MemoryStore(
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
- case e: DeserializedMemoryEntry =>
+ case e: DeserializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
- case SerializedMemoryEntry(bytes, _) => Some(bytes)
+ case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
}
}
- def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+ def getValues(blockId: BlockId): Option[Iterator[_]] = {
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
- case e: SerializedMemoryEntry =>
+ case e: SerializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getValues on deserialized blocks")
- case DeserializedMemoryEntry(values, _) => Some(values.iterator)
+ case DeserializedMemoryEntry(values, _, _) =>
+ val x = Some(values)
+ x.map(_.iterator)
}
}
@@ -333,6 +347,24 @@ private[spark] class MemoryStore(
}
}
+ def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
+ val data = entry match {
+ case DeserializedMemoryEntry(values, _, _) => Left(values)
+ case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
+ }
+ val newEffectiveStorageLevel =
+ blockManager.dropFromMemory(blockId, () => data)(entry.classTag)
+ if (newEffectiveStorageLevel.isValid) {
+ // The block is still present in at least one store, so release the lock
+ // but don't delete the block info
+ blockManager.releaseLock(blockId)
+ } else {
+ // The block isn't present in any store, so delete the block info so that the
+ // block can be stored again
+ blockManager.blockInfoManager.removeBlock(blockId)
+ }
+ }
+
if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
@@ -341,20 +373,7 @@ private[spark] class MemoryStore(
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
- val data = entry match {
- case DeserializedMemoryEntry(values, _) => Left(values)
- case SerializedMemoryEntry(buffer, _) => Right(buffer)
- }
- val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
- if (newEffectiveStorageLevel.isValid) {
- // The block is still present in at least one store, so release the lock
- // but don't delete the block info
- blockManager.releaseLock(blockId)
- } else {
- // The block isn't present in any store, so delete the block info so that the
- // block can be stored again
- blockManager.blockInfoManager.removeBlock(blockId)
- }
+ dropBlock(blockId, entry)
}
}
freedMemory
@@ -470,16 +489,16 @@ private[spark] class MemoryStore(
* @param unrolled an iterator for the partially-unrolled values.
* @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]].
*/
-private[storage] class PartiallyUnrolledIterator(
+private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
unrollMemory: Long,
- unrolled: Iterator[Any],
- rest: Iterator[Any])
- extends Iterator[Any] {
+ unrolled: Iterator[T],
+ rest: Iterator[T])
+ extends Iterator[T] {
private[this] var unrolledIteratorIsConsumed: Boolean = false
- private[this] var iter: Iterator[Any] = {
- val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, {
+ private[this] var iter: Iterator[T] = {
+ val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
unrolledIteratorIsConsumed = true
memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
})
@@ -487,7 +506,7 @@ private[storage] class PartiallyUnrolledIterator(
}
override def hasNext: Boolean = iter.hasNext
- override def next(): Any = iter.next()
+ override def next(): T = iter.next()
/**
* Called to dispose of this iterator and free its memory.
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 1c3f2bc315..2732cd6749 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.Matchers
import org.scalatest.time.{Millis, Span}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
+import org.apache.spark.util.io.ChunkedByteBuffer
class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
@@ -196,8 +197,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
blockManager.master.getLocations(blockId).foreach { cmId =>
val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
blockId.toString)
- val deserialized = blockManager.dataDeserialize(blockId, bytes.nioByteBuffer())
- .asInstanceOf[Iterator[Int]].toList
+ val deserialized = blockManager.dataDeserialize[Int](blockId,
+ new ChunkedByteBuffer(bytes.nioByteBuffer())).toList
assert(deserialized === (1 to 100).toList)
}
}
@@ -222,7 +223,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val numPartitions = 10
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
- .set("spark.testing.memory", (size * numPartitions).toString)
+ .set("spark.testing.memory", size.toString)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index fe83fc722a..7ee76aa4c6 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.implicitConversions
+import scala.reflect.ClassTag
import org.scalatest.BeforeAndAfterEach
import org.scalatest.time.SpanSugar._
@@ -52,7 +53,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
}
private def newBlockInfo(): BlockInfo = {
- new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)
+ new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)
}
private def withTaskId[T](taskAttemptId: Long)(block: => T): T = {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b78a3648cd..98e8450fa1 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage.StorageLevel._
@@ -62,7 +62,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val store = new BlockManager(name, rpcEnv, master, serializer, conf,
+ val serializerManager = new SerializerManager(serializer, conf)
+ val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(store.memoryStore)
store.initialize("app-id")
@@ -262,7 +263,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000, numCores = 1)
- val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer, conf,
+ val serializerManager = new SerializerManager(serializer, conf)
+ val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
memManager.setMemoryStore(failableStore.memoryStore)
failableStore.initialize("app-id")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index edf5cd35e4..9419dfaa00 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -24,6 +24,7 @@ import scala.concurrent.duration._
import scala.concurrent.Future
import scala.language.implicitConversions
import scala.language.postfixOps
+import scala.reflect.ClassTag
import org.mockito.{Matchers => mc}
import org.mockito.Mockito.{mock, times, verify, when}
@@ -40,7 +41,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
import org.apache.spark.util._
@@ -77,7 +78,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val transfer = transferService
.getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1))
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
+ val serializerManager = new SerializerManager(serializer, conf)
+ val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)
blockManager.initialize("app-id")
@@ -821,8 +823,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
maxOnHeapExecutionMemory = Long.MaxValue,
maxStorageMemory = 1200,
numCores = 1)
+ val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
- new JavaSerializer(conf), conf, memoryManager, mapOutputTracker,
+ serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, 0)
memoryManager.setMemoryStore(store.memoryStore)
@@ -1074,7 +1077,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll with all the space in the world. This should succeed.
- var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ var putResult =
+ memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
@@ -1085,7 +1089,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll with not enough space. This should succeed after kicking out someBlock1.
assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY))
assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY))
- putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ putResult =
+ memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
assert(memoryStore.contains("someBlock2"))
@@ -1099,7 +1104,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the mean time, however, we kicked out someBlock2 before giving up.
assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY))
- putResult = memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY)
+ putResult =
+ memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
assert(!memoryStore.contains("someBlock2"))
assert(putResult.isLeft)
@@ -1121,8 +1127,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll with plenty of space. This should succeed and cache both blocks.
- val result1 = memoryStore.putIterator("b1", smallIterator, memOnly)
- val result2 = memoryStore.putIterator("b2", smallIterator, memOnly)
+ val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any)
+ val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any)
assert(memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(result1.isRight) // unroll was successful
@@ -1137,7 +1143,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b2", smallIterator, memOnly)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
- val result3 = memoryStore.putIterator("b3", smallIterator, memOnly)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -1147,7 +1153,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b3", smallIterator, memOnly)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
- val result4 = memoryStore.putIterator("b4", bigIterator, memOnly)
+ val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, ClassTag.Any)
assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
@@ -1175,7 +1181,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
- val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -1191,7 +1197,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// the block may be stored to disk. During the unrolling process, block "b2" should be kicked
// out, so the memory store should contain only b3, while the disk store should contain
// b1, b2 and b4.
- val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
+ val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any)
assert(result4.isLeft)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
@@ -1211,28 +1217,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// All unroll memory used is released because putIterator did not return an iterator
- assert(memoryStore.putIterator("b1", smallIterator, memOnly).isRight)
+ assert(memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- assert(memoryStore.putIterator("b2", smallIterator, memOnly).isRight)
+ assert(memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll memory is not released because putIterator returned an iterator
// that still depends on the underlying vector used in the process
- assert(memoryStore.putIterator("b3", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB3 > 0)
// The unroll memory owned by this thread builds on top of its value after the previous unrolls
- assert(memoryStore.putIterator("b4", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b4", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
// ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- assert(memoryStore.putIterator("b5", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b5", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- assert(memoryStore.putIterator("b6", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b6", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- assert(memoryStore.putIterator("b7", smallIterator, memOnly).isLeft)
+ assert(memoryStore.putIterator("b7", smallIterator, memOnly, ClassTag.Any).isLeft)
val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
@@ -1244,7 +1250,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
store.blockInfoManager.lockNewBlockForWriting(
- blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
+ blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false))
memoryStore.putBytes(blockId, 13000, () => {
fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
})
@@ -1340,7 +1346,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
port: Int, execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
- level: StorageLevel): Future[Unit] = {
+ level: StorageLevel,
+ classTag: ClassTag[_]): Future[Unit] = {
import scala.concurrent.ExecutionContext.Implicits.global
Future {}
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 13caa54d06..68e9c50d60 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -569,6 +569,9 @@ object MimaExcludes {
if missing.map(_.fullName).sameElements(Seq("org.apache.spark.Logging")) => false
case _ => true
}
+ ) ++ Seq(
+ // [SPARK-13990] Automatically pick serializer when caching RDDs
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock")
)
case v if v.startsWith("1.6") =>
Seq(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 8625882b04..ace67a639c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -161,7 +161,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
- blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
+ blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead))
+ .asInstanceOf[Iterator[T]]
}
if (partition.isBlockIdValid) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 76f67ed601..122ca0627f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -34,12 +34,13 @@ import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.util._
import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.util.io.ChunkedByteBuffer
class ReceivedBlockHandlerSuite
extends SparkFunSuite
@@ -155,7 +156,7 @@ class ReceivedBlockHandlerSuite
val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
val bytes = reader.read(fileSegment)
reader.close()
- blockManager.dataDeserialize(generateBlockId(), bytes).toList
+ blockManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList
}
loggedData shouldEqual data
}
@@ -264,7 +265,8 @@ class ReceivedBlockHandlerSuite
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf,
+ val serializerManager = new SerializerManager(serializer, conf)
+ val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)
blockManager.initialize("app-id")