From 68ed3632c56389ab3ff4ea5d73c575f224dab4f6 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 9 Feb 2016 11:23:29 +0000 Subject: [SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated Replace SynchronizeQueue with synchronized access to a Queue Author: Sean Owen Closes #11111 from srowen/SPARK-13170. --- .../apache/spark/streaming/InputStreamsSuite.scala | 37 ++++++++++++++-------- 1 file changed, 24 insertions(+), 13 deletions(-) (limited to 'streaming/src/test') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 93c883362c..fa17b3a15c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -24,7 +24,7 @@ import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ -import scala.collection.mutable.SynchronizedQueue +import scala.collection.mutable import scala.language.postfixOps import com.google.common.io.Files @@ -40,7 +40,6 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted} import org.apache.spark.util.{ManualClock, Utils} class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -67,7 +66,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Feed data to the server to send to the network receiver val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val expectedOutput = input.map(_.toString) - for (i <- 0 until input.size) { + for (i <- input.indices) { testServer.send(input(i).toString + "\n") Thread.sleep(500) clock.advance(batchDuration.milliseconds) @@ -102,8 +101,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected // (whether the elements were received one in each interval is not verified) val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray - assert(output.size === expectedOutput.size) - for (i <- 0 until output.size) { + assert(output.length === expectedOutput.size) + for (i <- output.indices) { assert(output(i) === expectedOutput(i)) } } @@ -242,11 +241,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val input = Seq("1", "2", "3", "4", "5") val expectedOutput = input.map(Seq(_)) val outputQueue = new ConcurrentLinkedQueue[Seq[String]] - def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0) + def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty) // Set up the streaming context and input streams withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => - val queue = new SynchronizedQueue[RDD[String]]() + val queue = new mutable.Queue[RDD[String]]() val queueStream = ssc.queueStream(queue, oneAtATime = true) val outputStream = new TestOutputStream(queueStream, outputQueue) outputStream.register() @@ -256,9 +255,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val inputIterator = input.toIterator - for (i <- 0 until input.size) { + for (i <- input.indices) { // Enqueue more than 1 item per tick but they should dequeue one at a time - inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + inputIterator.take(2).foreach { i => + queue.synchronized { + queue += ssc.sparkContext.makeRDD(Seq(i)) + } + } clock.advance(batchDuration.milliseconds) } Thread.sleep(1000) @@ -281,13 +284,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("queue input stream - oneAtATime = false") { val outputQueue = new ConcurrentLinkedQueue[Seq[String]] - def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0) + def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty) val input = Seq("1", "2", "3", "4", "5") val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5")) // Set up the streaming context and input streams withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => - val queue = new SynchronizedQueue[RDD[String]]() + val queue = new mutable.Queue[RDD[String]]() val queueStream = ssc.queueStream(queue, oneAtATime = false) val outputStream = new TestOutputStream(queueStream, outputQueue) outputStream.register() @@ -298,12 +301,20 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Enqueue the first 3 items (one by one), they should be merged in the next batch val inputIterator = input.toIterator - inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + inputIterator.take(3).foreach { i => + queue.synchronized { + queue += ssc.sparkContext.makeRDD(Seq(i)) + } + } clock.advance(batchDuration.milliseconds) Thread.sleep(1000) // Enqueue the remaining items (again one by one), merged in the final batch - inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + inputIterator.foreach { i => + queue.synchronized { + queue += ssc.sparkContext.makeRDD(Seq(i)) + } + } clock.advance(batchDuration.milliseconds) Thread.sleep(1000) } -- cgit v1.2.3