From 31d069d4c2956306355d14087ca74ce1e6705217 Mon Sep 17 00:00:00 2001 From: Wilson Wu Date: Mon, 14 Mar 2016 09:13:29 +0000 Subject: [SPARK-13746][TESTS] stop using deprecated SynchronizedSet trait SynchronizedSet in package mutable is deprecated Author: Wilson Wu Closes #11580 from wilson888888888/spark-synchronizedset. --- .../streaming/kinesis/KinesisStreamSuite.scala | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) (limited to 'external') diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index ca5d13da46..4460b6bcca 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun Seconds(10), StorageLevel.MEMORY_ONLY, awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + val collected = new mutable.HashSet[Int] stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + collected.mkString(", ")) + collected.synchronized { + collected ++= rdd.collect() + logInfo("Collected = " + collected.mkString(", ")) + } } ssc.start() val testData = 1 to 10 eventually(timeout(120 seconds), interval(10 second)) { testUtils.pushData(testData, aggregateTestData) - assert(collected === testData.toSet, "\nData received does not match data sent") + assert(collected.synchronized { collected === testData.toSet }, + "\nData received does not match data sent") } ssc.stop(stopSparkContext = false) } @@ -205,10 +208,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun stream shouldBe a [ReceiverInputDStream[_]] - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + val collected = new mutable.HashSet[Int] stream.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + collected.mkString(", ")) + collected.synchronized { + collected ++= rdd.collect() + logInfo("Collected = " + collected.mkString(", ")) + } } ssc.start() @@ -216,7 +221,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun eventually(timeout(120 seconds), interval(10 second)) { testUtils.pushData(testData, aggregateTestData) val modData = testData.map(_ + 5) - assert(collected === modData.toSet, "\nData received does not match data sent") + assert(collected.synchronized { collected === modData.toSet }, + "\nData received does not match data sent") } ssc.stop(stopSparkContext = false) } -- cgit v1.2.3