diff options
author | Holden Karau <holden@us.ibm.com> | 2016-02-09 08:44:56 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-02-09 08:44:56 +0000 |
commit | 159198eff67ee9ead08fba60a585494ea1575147 (patch) | |
tree | b45f653a253ce0aac7af561caaeef9ea52099e1a /external/kafka | |
parent | f9307d8fc5223b4c5be07e3dc691a327f3bbfa7f (diff) | |
download | spark-159198eff67ee9ead08fba60a585494ea1575147.tar.gz spark-159198eff67ee9ead08fba60a585494ea1575147.tar.bz2 spark-159198eff67ee9ead08fba60a585494ea1575147.zip |
[SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming
Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it.
Some notes about how behaviour is different for reviewers:
The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates.
Author: Holden Karau <holden@us.ibm.com>
Author: tedyu <yuzhihong@gmail.com>
Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.
Diffstat (limited to 'external/kafka')
-rw-r--r-- | external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 43 |
1 files changed, 21 insertions, 22 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 655b161734..8398178e9b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -18,10 +18,11 @@ package org.apache.spark.streaming.kafka import java.io.File +import java.util.Arrays import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps @@ -101,8 +102,7 @@ class DirectKafkaStreamSuite ssc, kafkaParams, topics) } - val allReceived = - new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] + val allReceived = new ConcurrentLinkedQueue[(String, String)]() // hold a reference to the current offset ranges, so it can be used downstream var offsetRanges = Array[OffsetRange]() @@ -131,11 +131,12 @@ class DirectKafkaStreamSuite assert(partSize === rangeSize, "offset ranges are wrong") } } - stream.foreachRDD { rdd => allReceived ++= rdd.collect() } + stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, - "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) + "didn't get expected number of messages, messages:\n" + + allReceived.asScala.mkString("\n")) } ssc.stop() } @@ -173,8 +174,8 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] - stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } + val collectedData = new ConcurrentLinkedQueue[String]() + stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -219,8 +220,8 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] - stream.foreachRDD { rdd => collectedData ++= rdd.collect() } + val collectedData = new ConcurrentLinkedQueue[String]() + stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -265,7 +266,7 @@ class DirectKafkaStreamSuite // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.appendAll(data) + DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*)) } // This is ensure all the data is eventually receiving only once @@ -335,14 +336,14 @@ class DirectKafkaStreamSuite ssc, kafkaParams, Set(topic)) } - val allReceived = - new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] + val allReceived = new ConcurrentLinkedQueue[(String, String)] - stream.foreachRDD { rdd => allReceived ++= rdd.collect() } + stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, - "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) + "didn't get expected number of messages, messages:\n" + + allReceived.asScala.mkString("\n")) // Calculate all the record number collected in the StreamingListener. assert(collector.numRecordsSubmitted.get() === totalSent) @@ -389,17 +390,16 @@ class DirectKafkaStreamSuite } } - val collectedData = - new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]] + val collectedData = new ConcurrentLinkedQueue[Array[String]]() // Used for assertion failure messages. def dataToString: String = - collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") + collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - collectedData += data + collectedData.add(data) } ssc.start() @@ -415,7 +415,7 @@ class DirectKafkaStreamSuite eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. - assert(collectedData.exists(_.size == expectedSize), + assert(collectedData.asScala.exists(_.size == expectedSize), s" - No arrays of size $expectedSize for rate $rate found in $dataToString") } } @@ -433,7 +433,7 @@ class DirectKafkaStreamSuite } object DirectKafkaStreamSuite { - val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] + val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L class InputInfoCollector extends StreamingListener { @@ -468,4 +468,3 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long) processingDelay: Long, schedulingDelay: Long): Option[Double] = Some(rate) } - |