From 159198eff67ee9ead08fba60a585494ea1575147 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 9 Feb 2016 08:44:56 +0000 Subject: [SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it. Some notes about how behaviour is different for reviewers: The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates. Author: Holden Karau Author: tedyu Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming. --- .../org/apache/spark/streaming/TestOutputStream.scala | 5 +++-- .../spark/streaming/flume/FlumePollingStreamSuite.scala | 13 ++++++------- .../apache/spark/streaming/flume/FlumeStreamSuite.scala | 16 ++++++++-------- 3 files changed, 17 insertions(+), 17 deletions(-) (limited to 'external/flume/src') diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala index 57374ef515..fc02c9fcb5 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -33,10 +34,10 @@ import org.apache.spark.util.Utils * The buffer contains a sequence of RDD's, each containing a sequence of items */ class TestOutputStream[T: ClassTag](parent: DStream[T], - val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) + val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() - output += collected + output.add(collected) }, false) { // This is to clear the output buffer every it is read from a checkpoint diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 60db846ffb..10dcbf98bc 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -18,9 +18,9 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress +import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.duration._ import scala.language.postfixOps @@ -102,9 +102,8 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, utils.eventsPerBatch, 5) - val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] - with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputQueue) outputStream.register() ssc.start() @@ -115,11 +114,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log // The eventually is required to ensure that all data in the batch has been processed. eventually(timeout(10 seconds), interval(100 milliseconds)) { - val flattenOutputBuffer = outputBuffer.flatten - val headers = flattenOutputBuffer.map(_.event.getHeaders.asScala.map { + val flattenOutput = outputQueue.asScala.toSeq.flatten + val headers = flattenOutput.map(_.event.getHeaders.asScala.map { case (key, value) => (key.toString, value.toString) }).map(_.asJava) - val bodies = flattenOutputBuffer.map(e => JavaUtils.bytesToString(e.event.getBody)) + val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody)) utils.assertOutput(headers.asJava, bodies.asJava) } } finally { diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index b29e591c07..38208c6518 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -17,8 +17,9 @@ package org.apache.spark.streaming.flume +import java.util.concurrent.ConcurrentLinkedQueue + import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.duration._ import scala.language.postfixOps @@ -51,14 +52,14 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w val input = (1 to 100).map { _.toString } val utils = new FlumeTestUtils try { - val outputBuffer = startContext(utils.getTestPort(), testCompression) + val outputQueue = startContext(utils.getTestPort(), testCompression) eventually(timeout(10 seconds), interval(100 milliseconds)) { utils.writeInput(input.asJava, testCompression) } eventually(timeout(10 seconds), interval(100 milliseconds)) { - val outputEvents = outputBuffer.flatten.map { _.event } + val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event } outputEvents.foreach { event => event.getHeaders.get("test") should be("header") @@ -76,16 +77,15 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w /** Setup and start the streaming context */ private def startContext( - testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = { + testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = { ssc = new StreamingContext(conf, Milliseconds(200)) val flumeStream = FlumeUtils.createStream( ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression) - val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] - with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputQueue) outputStream.register() ssc.start() - outputBuffer + outputQueue } /** Class to create socket channel with compression */ -- cgit v1.2.3