aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorHolden Karau <holden@us.ibm.com>2016-02-09 08:44:56 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 08:44:56 +0000
commit159198eff67ee9ead08fba60a585494ea1575147 (patch)
treeb45f653a253ce0aac7af561caaeef9ea52099e1a /external
parentf9307d8fc5223b4c5be07e3dc691a327f3bbfa7f (diff)
downloadspark-159198eff67ee9ead08fba60a585494ea1575147.tar.gz
spark-159198eff67ee9ead08fba60a585494ea1575147.tar.bz2
spark-159198eff67ee9ead08fba60a585494ea1575147.zip
[SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming
Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it. Some notes about how behaviour is different for reviewers: The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates. Author: Holden Karau <holden@us.ibm.com> Author: tedyu <yuzhihong@gmail.com> Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala5
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala13
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala16
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala43
4 files changed, 38 insertions, 39 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
index 57374ef515..fc02c9fcb5 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
@@ -33,10 +34,10 @@ import org.apache.spark.util.Utils
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
class TestOutputStream[T: ClassTag](parent: DStream[T],
- val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
+ val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
- output += collected
+ output.add(collected)
}, false) {
// This is to clear the output buffer every it is read from a checkpoint
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 60db846ffb..10dcbf98bc 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -18,9 +18,9 @@
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
+import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -102,9 +102,8 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
utils.eventsPerBatch, 5)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
outputStream.register()
ssc.start()
@@ -115,11 +114,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
// The eventually is required to ensure that all data in the batch has been processed.
eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val flattenOutputBuffer = outputBuffer.flatten
- val headers = flattenOutputBuffer.map(_.event.getHeaders.asScala.map {
+ val flattenOutput = outputQueue.asScala.toSeq.flatten
+ val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
case (key, value) => (key.toString, value.toString)
}).map(_.asJava)
- val bodies = flattenOutputBuffer.map(e => JavaUtils.bytesToString(e.event.getBody))
+ val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
utils.assertOutput(headers.asJava, bodies.asJava)
}
} finally {
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index b29e591c07..38208c6518 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,8 +17,9 @@
package org.apache.spark.streaming.flume
+import java.util.concurrent.ConcurrentLinkedQueue
+
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -51,14 +52,14 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
val input = (1 to 100).map { _.toString }
val utils = new FlumeTestUtils
try {
- val outputBuffer = startContext(utils.getTestPort(), testCompression)
+ val outputQueue = startContext(utils.getTestPort(), testCompression)
eventually(timeout(10 seconds), interval(100 milliseconds)) {
utils.writeInput(input.asJava, testCompression)
}
eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val outputEvents = outputBuffer.flatten.map { _.event }
+ val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
outputEvents.foreach {
event =>
event.getHeaders.get("test") should be("header")
@@ -76,16 +77,15 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
/** Setup and start the streaming context */
private def startContext(
- testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
+ testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
ssc = new StreamingContext(conf, Milliseconds(200))
val flumeStream = FlumeUtils.createStream(
ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
outputStream.register()
ssc.start()
- outputBuffer
+ outputQueue
}
/** Class to create socket channel with compression */
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 655b161734..8398178e9b 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -18,10 +18,11 @@
package org.apache.spark.streaming.kafka
import java.io.File
+import java.util.Arrays
import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -101,8 +102,7 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, topics)
}
- val allReceived =
- new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]()
// hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
@@ -131,11 +131,12 @@ class DirectKafkaStreamSuite
assert(partSize === rangeSize, "offset ranges are wrong")
}
}
- stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
+ stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
- "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
}
ssc.stop()
}
@@ -173,8 +174,8 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)
- val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
- stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
@@ -219,8 +220,8 @@ class DirectKafkaStreamSuite
"Start offset not from latest"
)
- val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
- stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ val collectedData = new ConcurrentLinkedQueue[String]()
+ stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
@@ -265,7 +266,7 @@ class DirectKafkaStreamSuite
// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
- DirectKafkaStreamSuite.collectedData.appendAll(data)
+ DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
}
// This is ensure all the data is eventually receiving only once
@@ -335,14 +336,14 @@ class DirectKafkaStreamSuite
ssc, kafkaParams, Set(topic))
}
- val allReceived =
- new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
+ val allReceived = new ConcurrentLinkedQueue[(String, String)]
- stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
+ stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
- "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
+ "didn't get expected number of messages, messages:\n" +
+ allReceived.asScala.mkString("\n"))
// Calculate all the record number collected in the StreamingListener.
assert(collector.numRecordsSubmitted.get() === totalSent)
@@ -389,17 +390,16 @@ class DirectKafkaStreamSuite
}
}
- val collectedData =
- new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]]
+ val collectedData = new ConcurrentLinkedQueue[Array[String]]()
// Used for assertion failure messages.
def dataToString: String =
- collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
+ collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
- collectedData += data
+ collectedData.add(data)
}
ssc.start()
@@ -415,7 +415,7 @@ class DirectKafkaStreamSuite
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
- assert(collectedData.exists(_.size == expectedSize),
+ assert(collectedData.asScala.exists(_.size == expectedSize),
s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
}
}
@@ -433,7 +433,7 @@ class DirectKafkaStreamSuite
}
object DirectKafkaStreamSuite {
- val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
+ val collectedData = new ConcurrentLinkedQueue[String]()
@volatile var total = -1L
class InputInfoCollector extends StreamingListener {
@@ -468,4 +468,3 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long)
processingDelay: Long,
schedulingDelay: Long): Option[Double] = Some(rate)
}
-