aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorHuaxin Gao <huaxing@us.ibm.com>2016-02-22 09:44:32 +0000
committerSean Owen <sowen@cloudera.com>2016-02-22 09:44:32 +0000
commit8f35d3eac9268127512851e52864e64b0bae2f33 (patch)
tree213baa664403ecf7d0ea53c5de11da63f9dd0322 /extras
parent39ff15457026767a4d9ff191174fc85e7907f489 (diff)
downloadspark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.gz
spark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.bz2
spark-8f35d3eac9268127512851e52864e64b0bae2f33.zip
[SPARK-13186][STREAMING] migrate away from SynchronizedMap
trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead. Author: Huaxin Gao <huaxing@us.ibm.com> Closes #11250 from huaxingao/spark__13186.
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala38
1 files changed, 21 insertions, 17 deletions
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ee6a5f0390..ca5d13da46 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -230,7 +230,6 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
- with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
@@ -241,13 +240,16 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
- collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+ collectedData.synchronized {
+ collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+ }
})
ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
ssc.start()
- def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
+ def numBatchesWithData: Int =
+ collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
@@ -268,21 +270,23 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
// and return the same data
- val times = collectedData.keySet
- times.foreach { time =>
- val (arrayOfSeqNumRanges, data) = collectedData(time)
- val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
- rdd shouldBe a [KinesisBackedBlockRDD[_]]
-
- // Verify the recovered sequence ranges
- val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
- assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
- arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
- assert(expected.ranges.toSeq === found.ranges.toSeq)
+ collectedData.synchronized {
+ val times = collectedData.keySet
+ times.foreach { time =>
+ val (arrayOfSeqNumRanges, data) = collectedData(time)
+ val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
+ rdd shouldBe a[KinesisBackedBlockRDD[_]]
+
+ // Verify the recovered sequence ranges
+ val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+ assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
+ arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
+ assert(expected.ranges.toSeq === found.ranges.toSeq)
+ }
+
+ // Verify the recovered data
+ assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
}
-
- // Verify the recovered data
- assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
}
ssc.stop()
}