diff options
16 files changed, 226 insertions, 155 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 459fab88ce..e2c47ceda2 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -331,7 +331,7 @@ object SparkEnv extends Logging { // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, - serializer, conf, memoryManager, mapOutputTracker, shuffleManager, + serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index cc5e851c29..8f83668d79 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -17,6 +17,8 @@ package org.apache.spark.network +import scala.reflect.ClassTag + import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.storage.{BlockId, StorageLevel} @@ -35,7 +37,11 @@ trait BlockDataManager { * Returns true if the block was stored and false if the put operation failed or the block * already existed. */ - def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean + def putBlockData( + blockId: BlockId, + data: ManagedBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Boolean /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index 2de0f2033f..e43e3a2de2 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.concurrent.{Await, Future, Promise} import scala.concurrent.duration.Duration +import scala.reflect.ClassTag import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} @@ -76,7 +77,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo execId: String, blockId: BlockId, blockData: ManagedBuffer, - level: StorageLevel): Future[Unit] + level: StorageLevel, + classTag: ClassTag[_]): Future[Unit] /** * A special case of [[fetchBlocks]], as it fetches only one block and is blocking. @@ -114,7 +116,9 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo execId: String, blockId: BlockId, blockData: ManagedBuffer, - level: StorageLevel): Unit = { - Await.result(uploadBlock(hostname, port, execId, blockId, blockData, level), Duration.Inf) + level: StorageLevel, + classTag: ClassTag[_]): Unit = { + val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag) + Await.result(future, Duration.Inf) } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index c1dbca5db2..2ed8a00df7 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -20,6 +20,8 @@ package org.apache.spark.network.netty import java.nio.ByteBuffer import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag import org.apache.spark.internal.Logging import org.apache.spark.network.BlockDataManager @@ -61,12 +63,16 @@ class NettyBlockRpcServer( responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer) case uploadBlock: UploadBlock => - // StorageLevel is serialized as bytes using our JavaSerializer. - val level: StorageLevel = - serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata)) + // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer. + val (level: StorageLevel, classTag: ClassTag[_]) = { + serializer + .newInstance() + .deserialize(ByteBuffer.wrap(uploadBlock.metadata)) + .asInstanceOf[(StorageLevel, ClassTag[_])] + } val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData)) val blockId = BlockId(uploadBlock.blockId) - blockManager.putBlockData(blockId, data, level) + blockManager.putBlockData(blockId, data, level, classTag) responseContext.onSuccess(ByteBuffer.allocate(0)) } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index f588a28eed..5f3d4532dd 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} +import scala.reflect.ClassTag import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.network._ @@ -118,18 +119,19 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage execId: String, blockId: BlockId, blockData: ManagedBuffer, - level: StorageLevel): Future[Unit] = { + level: StorageLevel, + classTag: ClassTag[_]): Future[Unit] = { val result = Promise[Unit]() val client = clientFactory.createClient(hostname, port) - // StorageLevel is serialized as bytes using our JavaSerializer. Everything else is encoded - // using our binary protocol. - val levelBytes = JavaUtils.bufferToArray(serializer.newInstance().serialize(level)) + // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer. + // Everything else is encoded using our binary protocol. + val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) // Convert or copy nio buffer into array in order to serialize it. val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) - client.sendRpc(new UploadBlock(appId, execId, blockId.toString, levelBytes, array).toByteBuffer, + client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer, new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { logTrace(s"Successfully uploaded block $blockId") diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 8a577c83e1..f96551c793 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -326,7 +326,7 @@ abstract class RDD[T: ClassTag]( val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. - SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => { + SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { readCachedBlock = false computeOrReadCheckpoint(partition, context) }) match { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 46fab7a899..94d11c5be5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -21,6 +21,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.reflect.ClassTag import com.google.common.collect.ConcurrentHashMultiset @@ -37,10 +38,14 @@ import org.apache.spark.internal.Logging * @param level the block's storage level. This is the requested persistence level, not the * effective storage level of the block (i.e. if this is MEMORY_AND_DISK, then this * does not imply that the block is actually resident in memory). + * @param classTag the block's [[ClassTag]], used to select the serializer * @param tellMaster whether state changes for this block should be reported to the master. This * is true for most blocks, but is false for broadcast blocks. */ -private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { +private[storage] class BlockInfo( + val level: StorageLevel, + val classTag: ClassTag[_], + val tellMaster: Boolean) { /** * The size of the block (in bytes) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index aa2561d8c3..83f8c5c37d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ +import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal @@ -37,7 +38,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExternalShuffleClient import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.rpc.RpcEnv -import org.apache.spark.serializer.{Serializer, SerializerInstance} +import org.apache.spark.serializer.{Serializer, SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ import org.apache.spark.util._ @@ -59,7 +60,7 @@ private[spark] class BlockManager( executorId: String, rpcEnv: RpcEnv, val master: BlockManagerMaster, - defaultSerializer: Serializer, + serializerManager: SerializerManager, val conf: SparkConf, memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, @@ -294,8 +295,12 @@ private[spark] class BlockManager( /** * Put the block locally, using the given storage level. */ - override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = { - putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level) + override def putBlockData( + blockId: BlockId, + data: ManagedBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Boolean = { + putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag) } /** @@ -417,7 +422,7 @@ private[spark] class BlockManager( val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get } else { - dataDeserialize(blockId, memoryStore.getBytes(blockId).get) + dataDeserialize(blockId, memoryStore.getBytes(blockId).get)(info.classTag) } val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) @@ -425,10 +430,11 @@ private[spark] class BlockManager( val iterToReturn: Iterator[Any] = { val diskBytes = diskStore.getBytes(blockId) if (level.deserialized) { - val diskValues = dataDeserialize(blockId, diskBytes) + val diskValues = dataDeserialize(blockId, diskBytes)(info.classTag) maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) } else { - dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)) + val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes) + dataDeserialize(blockId, bytes)(info.classTag) } } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) @@ -502,7 +508,7 @@ private[spark] class BlockManager( * * This does not acquire a lock on this block in this JVM. */ - def getRemoteValues(blockId: BlockId): Option[BlockResult] = { + private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.size) } @@ -633,12 +639,13 @@ private[spark] class BlockManager( * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate( + def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, - makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = { + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { // Initially we hold no locks on this block. - doPutIterator(blockId, makeIterator, level, keepReadLock = true) match { + doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. @@ -664,13 +671,13 @@ private[spark] class BlockManager( /** * @return true if the block was stored or false if an error occurred. */ - def putIterator( + def putIterator[T: ClassTag]( blockId: BlockId, - values: Iterator[Any], + values: Iterator[T], level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(values != null, "Values is null") - doPutIterator(blockId, () => values, level, tellMaster) match { + doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match { case None => true case Some(iter) => @@ -703,13 +710,13 @@ private[spark] class BlockManager( * * @return true if the block was stored or false if an error occurred. */ - def putBytes( + def putBytes[T: ClassTag]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(bytes != null, "Bytes is null") - doPutBytes(blockId, bytes, level, tellMaster) + doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster) } /** @@ -723,13 +730,14 @@ private[spark] class BlockManager( * returns. * @return true if the block was already present or if the put succeeded, false otherwise. */ - private def doPutBytes( + private def doPutBytes[T]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, + classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Boolean = { - doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo => + doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis // Since we're storing bytes, initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. @@ -737,7 +745,7 @@ private[spark] class BlockManager( Future { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool - replicate(blockId, bytes, level) + replicate(blockId, bytes, level, classTag) }(futureExecutionContext) } else { null @@ -749,8 +757,8 @@ private[spark] class BlockManager( // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { - val values = dataDeserialize(blockId, bytes) - memoryStore.putIterator(blockId, values, level) match { + val values = dataDeserialize(blockId, bytes)(classTag) + memoryStore.putIterator(blockId, values, level, classTag) match { case Right(_) => true case Left(iter) => // If putting deserialized values in memory failed, we will put the bytes directly to @@ -769,14 +777,14 @@ private[spark] class BlockManager( diskStore.putBytes(blockId, bytes) } - val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) + val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory, externalBlockStore, or disk store, // tell the master about it. - putBlockInfo.size = size + info.size = size if (tellMaster) { - reportBlockStatus(blockId, putBlockInfo, putBlockStatus) + reportBlockStatus(blockId, info, putBlockStatus) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) @@ -804,6 +812,7 @@ private[spark] class BlockManager( private def doPut[T]( blockId: BlockId, level: StorageLevel, + classTag: ClassTag[_], tellMaster: Boolean, keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = { @@ -811,7 +820,7 @@ private[spark] class BlockManager( require(level != null && level.isValid, "StorageLevel is null or invalid") val putBlockInfo = { - val newInfo = new BlockInfo(level, tellMaster) + val newInfo = new BlockInfo(level, classTag, tellMaster) if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { newInfo } else { @@ -864,21 +873,22 @@ private[spark] class BlockManager( * @return None if the block was already present or if the put succeeded, or Some(iterator) * if the put failed. */ - private def doPutIterator( + private def doPutIterator[T]( blockId: BlockId, - iterator: () => Iterator[Any], + iterator: () => Iterator[T], level: StorageLevel, + classTag: ClassTag[T], tellMaster: Boolean = true, - keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = { - doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo => + keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = { + doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis - var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] = None + var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None // Size of the block in bytes var size = 0L if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. - memoryStore.putIterator(blockId, iterator(), level) match { + memoryStore.putIterator(blockId, iterator(), level, classTag) match { case Right(s) => size = s case Left(iter) => @@ -886,7 +896,7 @@ private[spark] class BlockManager( if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { fileOutputStream => - dataSerializeStream(blockId, fileOutputStream, iter) + dataSerializeStream(blockId, fileOutputStream, iter)(classTag) } size = diskStore.getSize(blockId) } else { @@ -895,19 +905,19 @@ private[spark] class BlockManager( } } else if (level.useDisk) { diskStore.put(blockId) { fileOutputStream => - dataSerializeStream(blockId, fileOutputStream, iterator()) + dataSerializeStream(blockId, fileOutputStream, iterator())(classTag) } size = diskStore.getSize(blockId) } - val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) + val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory, externalBlockStore, or disk store, // tell the master about it. - putBlockInfo.size = size + info.size = size if (tellMaster) { - reportBlockStatus(blockId, putBlockInfo, putBlockStatus) + reportBlockStatus(blockId, info, putBlockStatus) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) @@ -915,9 +925,9 @@ private[spark] class BlockManager( logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis - val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo) + val bytesToReplicate = doGetLocalBytes(blockId, info) try { - replicate(blockId, bytesToReplicate, level) + replicate(blockId, bytesToReplicate, level, classTag) } finally { bytesToReplicate.dispose() } @@ -978,12 +988,13 @@ private[spark] class BlockManager( * @return a copy of the iterator. The original iterator passed this method should no longer * be used after this method returns. */ - private def maybeCacheDiskValuesInMemory( + private def maybeCacheDiskValuesInMemory[T]( blockInfo: BlockInfo, blockId: BlockId, level: StorageLevel, - diskIterator: Iterator[Any]): Iterator[Any] = { + diskIterator: Iterator[T]): Iterator[T] = { require(level.deserialized) + val classTag = blockInfo.classTag.asInstanceOf[ClassTag[T]] if (level.useMemory) { // Synchronize on blockInfo to guard against a race condition where two readers both try to // put values read from disk into the MemoryStore. @@ -992,7 +1003,7 @@ private[spark] class BlockManager( // Note: if we had a means to discard the disk iterator, we would do that here. memoryStore.getValues(blockId).get } else { - memoryStore.putIterator(blockId, diskIterator, level) match { + memoryStore.putIterator(blockId, diskIterator, level, classTag) match { case Left(iter) => // The memory store put() failed, so it returned the iterator back to us: iter @@ -1001,7 +1012,7 @@ private[spark] class BlockManager( memoryStore.getValues(blockId).get } } - } + }.asInstanceOf[Iterator[T]] } else { diskIterator } @@ -1027,7 +1038,11 @@ private[spark] class BlockManager( * Replicate block to another node. Not that this is a blocking call that returns after * the block has been replicated. */ - private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel): Unit = { + private def replicate( + blockId: BlockId, + data: ChunkedByteBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 val peersForReplication = new ArrayBuffer[BlockManagerId] @@ -1087,7 +1102,8 @@ private[spark] class BlockManager( peer.executorId, blockId, new NettyManagedBuffer(data.toNetty), - tLevel) + tLevel, + classTag) logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms" .format(System.currentTimeMillis - onePeerStartTime)) peersReplicatedTo += peer @@ -1132,9 +1148,9 @@ private[spark] class BlockManager( * @return true if the block was stored or false if the block was already stored or an * error occurred. */ - def putSingle( + def putSingle[T: ClassTag]( blockId: BlockId, - value: Any, + value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean = { putIterator(blockId, Iterator(value), level, tellMaster) @@ -1151,9 +1167,9 @@ private[spark] class BlockManager( * * @return the block's new effective StorageLevel. */ - def dropFromMemory( + private[storage] def dropFromMemory[T: ClassTag]( blockId: BlockId, - data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = { + data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false @@ -1165,7 +1181,10 @@ private[spark] class BlockManager( data() match { case Left(elements) => diskStore.put(blockId) { fileOutputStream => - dataSerializeStream(blockId, fileOutputStream, elements.toIterator) + dataSerializeStream( + blockId, + fileOutputStream, + elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) } case Right(bytes) => diskStore.putBytes(blockId, bytes) @@ -1271,17 +1290,17 @@ private[spark] class BlockManager( } /** Serializes into a stream. */ - def dataSerializeStream( + def dataSerializeStream[T: ClassTag]( blockId: BlockId, outputStream: OutputStream, - values: Iterator[Any]): Unit = { + values: Iterator[T]): Unit = { val byteStream = new BufferedOutputStream(outputStream) - val ser = defaultSerializer.newInstance() + val ser = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } /** Serializes into a chunked byte buffer. */ - def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = { + def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = { val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4) dataSerializeStream(blockId, byteArrayChunkOutputStream, values) new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)) @@ -1291,29 +1310,22 @@ private[spark] class BlockManager( * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = { - bytes.rewind() - dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true)) - } - - /** - * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of - * the iterator is reached. - */ - def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[Any] = { - dataDeserializeStream(blockId, bytes.toInputStream(dispose = true)) + def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[T] = { + dataDeserializeStream[T](blockId, bytes.toInputStream(dispose = true)) } /** * Deserializes a InputStream into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = { + def dataDeserializeStream[T: ClassTag]( + blockId: BlockId, + inputStream: InputStream): Iterator[T] = { val stream = new BufferedInputStream(inputStream) - defaultSerializer + serializerManager.getSerializer(implicitly[ClassTag[T]]) .newInstance() .deserializeStream(wrapForCompression(blockId, stream)) - .asIterator + .asIterator.asInstanceOf[Iterator[T]] } def stop(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 94171324f8..d370ee912a 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -21,6 +21,7 @@ import java.util.LinkedHashMap import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging @@ -30,11 +31,18 @@ import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector import org.apache.spark.util.io.ChunkedByteBuffer -private sealed trait MemoryEntry { - val size: Long +private sealed trait MemoryEntry[T] { + def size: Long + def classTag: ClassTag[T] } -private case class DeserializedMemoryEntry(value: Array[Any], size: Long) extends MemoryEntry -private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: Long) extends MemoryEntry +private case class DeserializedMemoryEntry[T]( + value: Array[T], + size: Long, + classTag: ClassTag[T]) extends MemoryEntry[T] +private case class SerializedMemoryEntry[T]( + buffer: ChunkedByteBuffer, + size: Long, + classTag: ClassTag[T]) extends MemoryEntry[T] /** * Stores blocks in memory, either as Arrays of deserialized Java objects or as @@ -49,7 +57,7 @@ private[spark] class MemoryStore( // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and // acquiring or releasing unroll memory, must be synchronized on `memoryManager`! - private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) + private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true) // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `memoryManager` @@ -95,13 +103,16 @@ private[spark] class MemoryStore( * * @return true if the put() succeeded, false otherwise. */ - def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): Boolean = { + def putBytes[T: ClassTag]( + blockId: BlockId, + size: Long, + _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") if (memoryManager.acquireStorageMemory(blockId, size)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) - val entry = new SerializedMemoryEntry(bytes, size) + val entry = new SerializedMemoryEntry[T](bytes, size, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) } @@ -129,10 +140,11 @@ private[spark] class MemoryStore( * iterator or call `close()` on it in order to free the storage memory consumed by the * partially-unrolled block. */ - private[storage] def putIterator( + private[storage] def putIterator[T]( blockId: BlockId, - values: Iterator[Any], - level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = { + values: Iterator[T], + level: StorageLevel, + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") @@ -151,7 +163,7 @@ private[spark] class MemoryStore( // Keep track of unroll memory used by this particular block / putIterator() operation var unrollMemoryUsedByThisBlock = 0L // Underlying vector for unrolling the block - var vector = new SizeTrackingVector[Any] + var vector = new SizeTrackingVector[T]()(classTag) // Request enough memory to begin unrolling keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) @@ -187,10 +199,10 @@ private[spark] class MemoryStore( val arrayValues = vector.toArray vector = null val entry = if (level.deserialized) { - new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues)) + new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) } else { - val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - new SerializedMemoryEntry(bytes, bytes.size) + val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)(classTag) + new SerializedMemoryEntry[T](bytes, bytes.size, classTag) } val size = entry.size def transferUnrollToStorage(amount: Long): Unit = { @@ -248,19 +260,21 @@ private[spark] class MemoryStore( val entry = entries.synchronized { entries.get(blockId) } entry match { case null => None - case e: DeserializedMemoryEntry => + case e: DeserializedMemoryEntry[_] => throw new IllegalArgumentException("should only call getBytes on serialized blocks") - case SerializedMemoryEntry(bytes, _) => Some(bytes) + case SerializedMemoryEntry(bytes, _, _) => Some(bytes) } } - def getValues(blockId: BlockId): Option[Iterator[Any]] = { + def getValues(blockId: BlockId): Option[Iterator[_]] = { val entry = entries.synchronized { entries.get(blockId) } entry match { case null => None - case e: SerializedMemoryEntry => + case e: SerializedMemoryEntry[_] => throw new IllegalArgumentException("should only call getValues on deserialized blocks") - case DeserializedMemoryEntry(values, _) => Some(values.iterator) + case DeserializedMemoryEntry(values, _, _) => + val x = Some(values) + x.map(_.iterator) } } @@ -333,6 +347,24 @@ private[spark] class MemoryStore( } } + def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { + val data = entry match { + case DeserializedMemoryEntry(values, _, _) => Left(values) + case SerializedMemoryEntry(buffer, _, _) => Right(buffer) + } + val newEffectiveStorageLevel = + blockManager.dropFromMemory(blockId, () => data)(entry.classTag) + if (newEffectiveStorageLevel.isValid) { + // The block is still present in at least one store, so release the lock + // but don't delete the block info + blockManager.releaseLock(blockId) + } else { + // The block isn't present in any store, so delete the block info so that the + // block can be stored again + blockManager.blockInfoManager.removeBlock(blockId) + } + } + if (freedMemory >= space) { logInfo(s"${selectedBlocks.size} blocks selected for dropping") for (blockId <- selectedBlocks) { @@ -341,20 +373,7 @@ private[spark] class MemoryStore( // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { - val data = entry match { - case DeserializedMemoryEntry(values, _) => Left(values) - case SerializedMemoryEntry(buffer, _) => Right(buffer) - } - val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data) - if (newEffectiveStorageLevel.isValid) { - // The block is still present in at least one store, so release the lock - // but don't delete the block info - blockManager.releaseLock(blockId) - } else { - // The block isn't present in any store, so delete the block info so that the - // block can be stored again - blockManager.blockInfoManager.removeBlock(blockId) - } + dropBlock(blockId, entry) } } freedMemory @@ -470,16 +489,16 @@ private[spark] class MemoryStore( * @param unrolled an iterator for the partially-unrolled values. * @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]]. */ -private[storage] class PartiallyUnrolledIterator( +private[storage] class PartiallyUnrolledIterator[T]( memoryStore: MemoryStore, unrollMemory: Long, - unrolled: Iterator[Any], - rest: Iterator[Any]) - extends Iterator[Any] { + unrolled: Iterator[T], + rest: Iterator[T]) + extends Iterator[T] { private[this] var unrolledIteratorIsConsumed: Boolean = false - private[this] var iter: Iterator[Any] = { - val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, { + private[this] var iter: Iterator[T] = { + val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, { unrolledIteratorIsConsumed = true memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) }) @@ -487,7 +506,7 @@ private[storage] class PartiallyUnrolledIterator( } override def hasNext: Boolean = iter.hasNext - override def next(): Any = iter.next() + override def next(): T = iter.next() /** * Called to dispose of this iterator and free its memory. diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 1c3f2bc315..2732cd6749 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.Matchers import org.scalatest.time.{Millis, Span} import org.apache.spark.storage.{RDDBlockId, StorageLevel} +import org.apache.spark.util.io.ChunkedByteBuffer class NotSerializableClass class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {} @@ -196,8 +197,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex blockManager.master.getLocations(blockId).foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, blockId.toString) - val deserialized = blockManager.dataDeserialize(blockId, bytes.nioByteBuffer()) - .asInstanceOf[Iterator[Int]].toList + val deserialized = blockManager.dataDeserialize[Int](blockId, + new ChunkedByteBuffer(bytes.nioByteBuffer())).toList assert(deserialized === (1 to 100).toList) } } @@ -222,7 +223,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val numPartitions = 10 val conf = new SparkConf() .set("spark.storage.unrollMemoryThreshold", "1024") - .set("spark.testing.memory", (size * numPartitions).toString) + .set("spark.testing.memory", size.toString) sc = new SparkContext(clusterUrl, "test", conf) val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY) assert(data.count() === size) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index fe83fc722a..7ee76aa4c6 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.implicitConversions +import scala.reflect.ClassTag import org.scalatest.BeforeAndAfterEach import org.scalatest.time.SpanSugar._ @@ -52,7 +53,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { } private def newBlockInfo(): BlockInfo = { - new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false) + new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false) } private def withTaskId[T](taskAttemptId: Long)(block: => T): T = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index b78a3648cd..98e8450fa1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.StorageLevel._ @@ -62,7 +62,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) - val store = new BlockManager(name, rpcEnv, master, serializer, conf, + val serializerManager = new SerializerManager(serializer, conf) + val store = new BlockManager(name, rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memManager.setMemoryStore(store.memoryStore) store.initialize("app-id") @@ -262,7 +263,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo when(failableTransfer.hostName).thenReturn("some-hostname") when(failableTransfer.port).thenReturn(1000) val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000, numCores = 1) - val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer, conf, + val serializerManager = new SerializerManager(serializer, conf) + val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0) memManager.setMemoryStore(failableStore.memoryStore) failableStore.initialize("app-id") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index edf5cd35e4..9419dfaa00 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -24,6 +24,7 @@ import scala.concurrent.duration._ import scala.concurrent.Future import scala.language.implicitConversions import scala.language.postfixOps +import scala.reflect.ClassTag import org.mockito.{Matchers => mc} import org.mockito.Mockito.{mock, times, verify, when} @@ -40,7 +41,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.network.shuffle.BlockFetchingListener import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util._ @@ -77,7 +78,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val transfer = transferService .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1)) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) - val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, + val serializerManager = new SerializerManager(serializer, conf) + val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memManager.setMemoryStore(blockManager.memoryStore) blockManager.initialize("app-id") @@ -821,8 +823,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE maxOnHeapExecutionMemory = Long.MaxValue, maxStorageMemory = 1200, numCores = 1) + val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, - new JavaSerializer(conf), conf, memoryManager, mapOutputTracker, + serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memoryManager.setMemoryStore(store.memoryStore) @@ -1074,7 +1077,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with all the space in the world. This should succeed. - var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY) + var putResult = + memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any) assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => @@ -1085,7 +1089,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed after kicking out someBlock1. assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)) assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)) - putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY) + putResult = + memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any) assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) assert(memoryStore.contains("someBlock2")) @@ -1099,7 +1104,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the mean time, however, we kicked out someBlock2 before giving up. assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)) - putResult = memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY) + putResult = + memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY, ClassTag.Any) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator assert(!memoryStore.contains("someBlock2")) assert(putResult.isLeft) @@ -1121,8 +1127,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with plenty of space. This should succeed and cache both blocks. - val result1 = memoryStore.putIterator("b1", smallIterator, memOnly) - val result2 = memoryStore.putIterator("b2", smallIterator, memOnly) + val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any) + val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any) assert(memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(result1.isRight) // unroll was successful @@ -1137,7 +1143,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.putIterator("b2", smallIterator, memOnly) // Unroll with not enough space. This should succeed but kick out b1 in the process. - val result3 = memoryStore.putIterator("b3", smallIterator, memOnly) + val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any) assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) @@ -1147,7 +1153,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.putIterator("b3", smallIterator, memOnly) // Unroll huge block with not enough space. This should fail and kick out b2 in the process. - val result4 = memoryStore.putIterator("b4", bigIterator, memOnly) + val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, ClassTag.Any) assert(result4.isLeft) // unroll was unsuccessful assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) @@ -1175,7 +1181,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed but kick out b1 in the process. // Memory store should contain b2 and b3, while disk store should contain only b1 - val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk) + val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any) assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) @@ -1191,7 +1197,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // the block may be stored to disk. During the unrolling process, block "b2" should be kicked // out, so the memory store should contain only b3, while the disk store should contain // b1, b2 and b4. - val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk) + val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any) assert(result4.isLeft) assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) @@ -1211,28 +1217,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // All unroll memory used is released because putIterator did not return an iterator - assert(memoryStore.putIterator("b1", smallIterator, memOnly).isRight) + assert(memoryStore.putIterator("b1", smallIterator, memOnly, ClassTag.Any).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - assert(memoryStore.putIterator("b2", smallIterator, memOnly).isRight) + assert(memoryStore.putIterator("b2", smallIterator, memOnly, ClassTag.Any).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll memory is not released because putIterator returned an iterator // that still depends on the underlying vector used in the process - assert(memoryStore.putIterator("b3", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b3", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB3 > 0) // The unroll memory owned by this thread builds on top of its value after the previous unrolls - assert(memoryStore.putIterator("b4", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b4", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) // ... but only to a certain extent (until we run out of free space to grant new unroll memory) - assert(memoryStore.putIterator("b5", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b5", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask - assert(memoryStore.putIterator("b6", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b6", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask - assert(memoryStore.putIterator("b7", smallIterator, memOnly).isLeft) + assert(memoryStore.putIterator("b7", smallIterator, memOnly, ClassTag.Any).isLeft) val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) @@ -1244,7 +1250,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val memoryStore = store.memoryStore val blockId = BlockId("rdd_3_10") store.blockInfoManager.lockNewBlockForWriting( - blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)) + blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)) memoryStore.putBytes(blockId, 13000, () => { fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") }) @@ -1340,7 +1346,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, - level: StorageLevel): Future[Unit] = { + level: StorageLevel, + classTag: ClassTag[_]): Future[Unit] = { import scala.concurrent.ExecutionContext.Implicits.global Future {} } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 13caa54d06..68e9c50d60 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -569,6 +569,9 @@ object MimaExcludes { if missing.map(_.fullName).sameElements(Seq("org.apache.spark.Logging")) => false case _ => true } + ) ++ Seq( + // [SPARK-13990] Automatically pick serializer when caching RDDs + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock") ) case v if v.startsWith("1.6") => Seq( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 8625882b04..ace67a639c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -161,7 +161,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } - blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] + blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead)) + .asInstanceOf[Iterator[T]] } if (partition.isBlockIdValid) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 76f67ed601..122ca0627f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -34,12 +34,13 @@ import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util._ import org.apache.spark.util.{ManualClock, Utils} +import org.apache.spark.util.io.ChunkedByteBuffer class ReceivedBlockHandlerSuite extends SparkFunSuite @@ -155,7 +156,7 @@ class ReceivedBlockHandlerSuite val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf) val bytes = reader.read(fileSegment) reader.close() - blockManager.dataDeserialize(generateBlockId(), bytes).toList + blockManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList } loggedData shouldEqual data } @@ -264,7 +265,8 @@ class ReceivedBlockHandlerSuite name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf, + val serializerManager = new SerializerManager(serializer, conf) + val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memManager.setMemoryStore(blockManager.memoryStore) blockManager.initialize("app-id") |