diff options
author | Huaxin Gao <huaxing@us.ibm.com> | 2016-02-22 09:44:32 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-02-22 09:44:32 +0000 |
commit | 8f35d3eac9268127512851e52864e64b0bae2f33 (patch) | |
tree | 213baa664403ecf7d0ea53c5de11da63f9dd0322 /external | |
parent | 39ff15457026767a4d9ff191174fc85e7907f489 (diff) | |
download | spark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.gz spark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.bz2 spark-8f35d3eac9268127512851e52864e64b0bae2f33.zip |
[SPARK-13186][STREAMING] migrate away from SynchronizedMap
trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead.
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes #11250 from huaxingao/spark__13186.
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 797b07f80d..6a35ac14a8 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -65,19 +65,20 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long] + val result = new mutable.HashMap[String, Long]() stream.map(_._2).countByValue().foreachRDD { r => - val ret = r.collect() - ret.toMap.foreach { kv => - val count = result.getOrElseUpdate(kv._1, 0) + kv._2 - result.put(kv._1, count) + r.collect().foreach { kv => + result.synchronized { + val count = result.getOrElseUpdate(kv._1, 0) + kv._2 + result.put(kv._1, count) + } } } ssc.start() eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - assert(sent === result) + assert(result.synchronized { sent === result }) } } } |