aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
authorHolden Karau <holden@us.ibm.com>2016-02-09 08:44:56 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 08:44:56 +0000
commit159198eff67ee9ead08fba60a585494ea1575147 (patch)
treeb45f653a253ce0aac7af561caaeef9ea52099e1a /external/kafka
parentf9307d8fc5223b4c5be07e3dc691a327f3bbfa7f (diff)
downloadspark-159198eff67ee9ead08fba60a585494ea1575147.tar.gz
spark-159198eff67ee9ead08fba60a585494ea1575147.tar.bz2
spark-159198eff67ee9ead08fba60a585494ea1575147.zip
[SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming
Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it. Some notes about how behaviour is different for reviewers: The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates. Author: Holden Karau <holden@us.ibm.com> Author: tedyu <yuzhihong@gmail.com> Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala43
1 files changed, 21 insertions, 22 deletions
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 655b161734..8398178e9b 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -18,10 +18,11 @@
package org.apache.spark.streaming.kafka
import java.io.File
+import java.util.Arrays
import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -101,8 +102,7 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, topics)
}
- val allReceived =
- new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]()
// hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
@@ -131,11 +131,12 @@ class DirectKafkaStreamSuite
assert(partSize === rangeSize, "offset ranges are wrong")
}
}
- stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
+ stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
- "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
}
ssc.stop()
}
@@ -173,8 +174,8 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)
- val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
- stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
@@ -219,8 +220,8 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)
- val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
- stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
@@ -265,7 +266,7 @@ class DirectKafkaStreamSuite
// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
- DirectKafkaStreamSuite.collectedData.appendAll(data)
+ DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
}
// This is ensure all the data is eventually receiving only once
@@ -335,14 +336,14 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, Set(topic))
}
- val allReceived =
- new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]
- stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
+ stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
- "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
// Calculate all the record number collected in the StreamingListener.
assert(collector.numRecordsSubmitted.get() === totalSent)
@@ -389,17 +390,16 @@ class DirectKafkaStreamSuite
}
}
- val collectedData =
- new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]]
+ val collectedData = new ConcurrentLinkedQueue[Array[String]]()
// Used for assertion failure messages.
def dataToString: String =
- collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
+ collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
- collectedData += data
+ collectedData.add(data)
}
ssc.start()
@@ -415,7 +415,7 @@ class DirectKafkaStreamSuite
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
- assert(collectedData.exists(_.size == expectedSize),
+ assert(collectedData.asScala.exists(_.size == expectedSize),
s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
}
}
@@ -433,7 +433,7 @@ class DirectKafkaStreamSuite
}
object DirectKafkaStreamSuite {
- val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
+ val collectedData = new ConcurrentLinkedQueue[String]()
@volatile var total = -1L
class InputInfoCollector extends StreamingListener {
@@ -468,4 +468,3 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long)
processingDelay: Long,
schedulingDelay: Long): Option[Double] = Some(rate)
}
-