diff options
Diffstat (limited to 'extras')
-rw-r--r-- | extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 6fe24fe811..78263f9dca 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -137,8 +137,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Verify that the generated KinesisBackedBlockRDD has the all the right information val blockInfos = Seq(blockInfo1, blockInfo2) val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos) - nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]] - val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] + nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]] + val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]] assert(kinesisRDD.regionName === dummyRegionName) assert(kinesisRDD.endpointUrl === dummyEndpointUrl) assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds) @@ -203,7 +203,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun Seconds(10), StorageLevel.MEMORY_ONLY, addFive, awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - stream shouldBe a [ReceiverInputDStream[Int]] + stream shouldBe a [ReceiverInputDStream[_]] val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] stream.foreachRDD { rdd => @@ -272,7 +272,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun times.foreach { time => val (arrayOfSeqNumRanges, data) = collectedData(time) val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] - rdd shouldBe a [KinesisBackedBlockRDD[Array[Byte]]] + rdd shouldBe a [KinesisBackedBlockRDD[_]] // Verify the recovered sequence ranges val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] |