aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala5
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala3
2 files changed, 5 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 53fccd8d5e..0b2ec29813 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -120,7 +120,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
val blockId = partition.blockId
def getBlockFromBlockManager(): Option[Iterator[T]] = {
- blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
+ blockManager.get[T](blockId).map(_.data.asInstanceOf[Iterator[T]])
}
def getBlockFromWriteAheadLog(): Iterator[T] = {
@@ -163,7 +163,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
dataRead.rewind()
}
serializerManager
- .dataDeserializeStream(blockId, new ChunkedByteBuffer(dataRead).toInputStream())
+ .dataDeserializeStream(
+ blockId, new ChunkedByteBuffer(dataRead).toInputStream())(elementClassTag)
.asInstanceOf[Iterator[T]]
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index feb5c30c6a..7e665454a5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
+import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfter, Matchers}
@@ -163,7 +164,7 @@ class ReceivedBlockHandlerSuite
val bytes = reader.read(fileSegment)
reader.close()
serializerManager.dataDeserializeStream(
- generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream()).toList
+ generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream())(ClassTag.Any).toList
}
loggedData shouldEqual data
}