aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorHuaxin Gao <huaxing@us.ibm.com>2016-02-22 09:44:32 +0000
committerSean Owen <sowen@cloudera.com>2016-02-22 09:44:32 +0000
commit8f35d3eac9268127512851e52864e64b0bae2f33 (patch)
tree213baa664403ecf7d0ea53c5de11da63f9dd0322 /external
parent39ff15457026767a4d9ff191174fc85e7907f489 (diff)
downloadspark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.gz
spark-8f35d3eac9268127512851e52864e64b0bae2f33.tar.bz2
spark-8f35d3eac9268127512851e52864e64b0bae2f33.zip
[SPARK-13186][STREAMING] migrate away from SynchronizedMap
trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead. Author: Huaxin Gao <huaxing@us.ibm.com> Closes #11250 from huaxingao/spark__13186.
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala13
1 files changed, 7 insertions, 6 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 797b07f80d..6a35ac14a8 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -65,19 +65,20 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
- val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
+ val result = new mutable.HashMap[String, Long]()
stream.map(_._2).countByValue().foreachRDD { r =>
- val ret = r.collect()
- ret.toMap.foreach { kv =>
- val count = result.getOrElseUpdate(kv._1, 0) + kv._2
- result.put(kv._1, count)
+ r.collect().foreach { kv =>
+ result.synchronized {
+ val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+ result.put(kv._1, count)
+ }
}
}
ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
- assert(sent === result)
+ assert(result.synchronized { sent === result })
}
}
}