aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala38
1 files changed, 21 insertions, 17 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ee6a5f0390..ca5d13da46 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -230,7 +230,6 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
- with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
@@ -241,13 +240,16 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
- collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+ collectedData.synchronized {
+ collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+ }
})
ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
ssc.start()
- def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
+ def numBatchesWithData: Int =
+ collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
@@ -268,21 +270,23 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
// and return the same data
- val times = collectedData.keySet
- times.foreach { time =>
- val (arrayOfSeqNumRanges, data) = collectedData(time)
- val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
- rdd shouldBe a [KinesisBackedBlockRDD[_]]
-
- // Verify the recovered sequence ranges
- val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
- assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
- arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
- assert(expected.ranges.toSeq === found.ranges.toSeq)
+ collectedData.synchronized {
+ val times = collectedData.keySet
+ times.foreach { time =>
+ val (arrayOfSeqNumRanges, data) = collectedData(time)
+ val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
+ rdd shouldBe a[KinesisBackedBlockRDD[_]]
+
+ // Verify the recovered sequence ranges
+ val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+ assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
+ arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
+ assert(expected.ranges.toSeq === found.ranges.toSeq)
+ }
+
+ // Verify the recovered data
+ assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
}
-
- // Verify the recovered data
- assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
}
ssc.stop()
}