aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 4e77cd6347..39d0de5179 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -157,7 +157,8 @@ class ReceivedBlockHandlerSuite
val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
val bytes = reader.read(fileSegment)
reader.close()
- serializerManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList
+ serializerManager.dataDeserializeStream(
+ generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream()).toList
}
loggedData shouldEqual data
}
@@ -265,7 +266,7 @@ class ReceivedBlockHandlerSuite
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)