From 8f35d3eac9268127512851e52864e64b0bae2f33 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 22 Feb 2016 09:44:32 +0000 Subject: [SPARK-13186][STREAMING] migrate away from SynchronizedMap trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead. Author: Huaxin Gao Closes #11250 from huaxingao/spark__13186. --- .../streaming/kinesis/KinesisStreamSuite.scala | 38 ++++++++++++---------- 1 file changed, 21 insertions(+), 17 deletions(-) (limited to 'extras') diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index ee6a5f0390..ca5d13da46 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -230,7 +230,6 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun val awsCredentials = KinesisTestUtils.getAWSCredentials() val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] - with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])] val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, @@ -241,13 +240,16 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq - collectedData(time) = (kRdd.arrayOfseqNumberRanges, data) + collectedData.synchronized { + collectedData(time) = (kRdd.arrayOfseqNumberRanges, data) + } }) ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint ssc.start() - def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty) + def numBatchesWithData: Int = + collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) } def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty @@ -268,21 +270,23 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges // and return the same data - val times = collectedData.keySet - times.foreach { time => - val (arrayOfSeqNumRanges, data) = collectedData(time) - val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] - rdd shouldBe a [KinesisBackedBlockRDD[_]] - - // Verify the recovered sequence ranges - val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] - assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size) - arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) => - assert(expected.ranges.toSeq === found.ranges.toSeq) + collectedData.synchronized { + val times = collectedData.keySet + times.foreach { time => + val (arrayOfSeqNumRanges, data) = collectedData(time) + val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] + rdd shouldBe a[KinesisBackedBlockRDD[_]] + + // Verify the recovered sequence ranges + val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] + assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size) + arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) => + assert(expected.ranges.toSeq === found.ranges.toSeq) + } + + // Verify the recovered data + assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data) } - - // Verify the recovered data - assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data) } ssc.stop() } -- cgit v1.2.3