diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 3 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 8 |
2 files changed, 7 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 8625882b04..ace67a639c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -161,7 +161,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } - blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] + blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead)) + .asInstanceOf[Iterator[T]] } if (partition.isBlockIdValid) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 76f67ed601..122ca0627f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -34,12 +34,13 @@ import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util._ import org.apache.spark.util.{ManualClock, Utils} +import org.apache.spark.util.io.ChunkedByteBuffer class ReceivedBlockHandlerSuite extends SparkFunSuite @@ -155,7 +156,7 @@ class ReceivedBlockHandlerSuite val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf) val bytes = reader.read(fileSegment) reader.close() - blockManager.dataDeserialize(generateBlockId(), bytes).toList + blockManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList } loggedData shouldEqual data } @@ -264,7 +265,8 @@ class ReceivedBlockHandlerSuite name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf, + val serializerManager = new SerializerManager(serializer, conf) + val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memManager.setMemoryStore(blockManager.memoryStore) blockManager.initialize("app-id") |