aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 53fccd8d5e..0b2ec29813 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -120,7 +120,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
val blockId = partition.blockId
def getBlockFromBlockManager(): Option[Iterator[T]] = {
- blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
+ blockManager.get[T](blockId).map(_.data.asInstanceOf[Iterator[T]])
}
def getBlockFromWriteAheadLog(): Iterator[T] = {
@@ -163,7 +163,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
dataRead.rewind()
}
serializerManager
- .dataDeserializeStream(blockId, new ChunkedByteBuffer(dataRead).toInputStream())
+ .dataDeserializeStream(
+ blockId, new ChunkedByteBuffer(dataRead).toInputStream())(elementClassTag)
.asInstanceOf[Iterator[T]]
}