aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorWilson Wu <wilson888888888@gmail.com>2016-03-14 09:13:29 +0000
committerSean Owen <sowen@cloudera.com>2016-03-14 09:13:29 +0000
commit31d069d4c2956306355d14087ca74ce1e6705217 (patch)
treebbbcfa1eca22c761eba73be7e91d554a6a332093 /external
parentacdf21970334cea9d6cfc287e4ccb8e72de9dee1 (diff)
downloadspark-31d069d4c2956306355d14087ca74ce1e6705217.tar.gz
spark-31d069d4c2956306355d14087ca74ce1e6705217.tar.bz2
spark-31d069d4c2956306355d14087ca74ce1e6705217.zip
[SPARK-13746][TESTS] stop using deprecated SynchronizedSet
trait SynchronizedSet in package mutable is deprecated Author: Wilson Wu <wilson888888888@gmail.com> Closes #11580 from wilson888888888/spark-synchronizedset.
Diffstat (limited to 'external')
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala22
1 files changed, 14 insertions, 8 deletions
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ca5d13da46..4460b6bcca 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -180,17 +180,20 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
- val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ val collected = new mutable.HashSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
- collected ++= rdd.collect()
- logInfo("Collected = " + collected.mkString(", "))
+ collected.synchronized {
+ collected ++= rdd.collect()
+ logInfo("Collected = " + collected.mkString(", "))
+ }
}
ssc.start()
val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
- assert(collected === testData.toSet, "\nData received does not match data sent")
+ assert(collected.synchronized { collected === testData.toSet },
+ "\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}
@@ -205,10 +208,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
stream shouldBe a [ReceiverInputDStream[_]]
- val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ val collected = new mutable.HashSet[Int]
stream.foreachRDD { rdd =>
- collected ++= rdd.collect()
- logInfo("Collected = " + collected.mkString(", "))
+ collected.synchronized {
+ collected ++= rdd.collect()
+ logInfo("Collected = " + collected.mkString(", "))
+ }
}
ssc.start()
@@ -216,7 +221,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5)
- assert(collected === modData.toSet, "\nData received does not match data sent")
+ assert(collected.synchronized { collected === modData.toSet },
+ "\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}