diff options
author | Wilson Wu <wilson888888888@gmail.com> | 2016-03-14 09:13:29 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-14 09:13:29 +0000 |
commit | 31d069d4c2956306355d14087ca74ce1e6705217 (patch) | |
tree | bbbcfa1eca22c761eba73be7e91d554a6a332093 /external/kinesis-asl | |
parent | acdf21970334cea9d6cfc287e4ccb8e72de9dee1 (diff) | |
download | spark-31d069d4c2956306355d14087ca74ce1e6705217.tar.gz spark-31d069d4c2956306355d14087ca74ce1e6705217.tar.bz2 spark-31d069d4c2956306355d14087ca74ce1e6705217.zip |
[SPARK-13746][TESTS] stop using deprecated SynchronizedSet
trait SynchronizedSet in package mutable is deprecated
Author: Wilson Wu <wilson888888888@gmail.com>
Closes #11580 from wilson888888888/spark-synchronizedset.
Diffstat (limited to 'external/kinesis-asl')
-rw-r--r-- | external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index ca5d13da46..4460b6bcca 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun Seconds(10), StorageLevel.MEMORY_ONLY, awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + val collected = new mutable.HashSet[Int] stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + collected.mkString(", ")) + collected.synchronized { + collected ++= rdd.collect() + logInfo("Collected = " + collected.mkString(", ")) + } } ssc.start() val testData = 1 to 10 eventually(timeout(120 seconds), interval(10 second)) { testUtils.pushData(testData, aggregateTestData) - assert(collected === testData.toSet, "\nData received does not match data sent") + assert(collected.synchronized { collected === testData.toSet }, + "\nData received does not match data sent") } ssc.stop(stopSparkContext = false) } @@ -205,10 +208,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun stream shouldBe a [ReceiverInputDStream[_]] - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + val collected = new mutable.HashSet[Int] stream.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + collected.mkString(", ")) + collected.synchronized { + collected ++= rdd.collect() + logInfo("Collected = " + collected.mkString(", ")) + } } ssc.start() @@ -216,7 +221,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun eventually(timeout(120 seconds), interval(10 second)) { testUtils.pushData(testData, aggregateTestData) val modData = testData.map(_ + 5) - assert(collected === modData.toSet, "\nData received does not match data sent") + assert(collected.synchronized { collected === modData.toSet }, + "\nData received does not match data sent") } ssc.stop(stopSparkContext = false) } |