From 0eb11527d13083ced215e3fda44ed849198a57cb Mon Sep 17 00:00:00 2001 From: Manuel Laflamme Date: Wed, 9 Jul 2014 10:45:45 -0700 Subject: [STREAMING] SPARK-2343: Fix QueueInputDStream with oneAtATime false Fix QueueInputDStream which was not removing dequeued items when used with the oneAtATime flag disabled. Author: Manuel Laflamme Closes #1285 from mlaflamm/spark-2343 and squashes the following commits: 61c9e38 [Manuel Laflamme] Unit tests for queue input stream c51d029 [Manuel Laflamme] Fix QueueInputDStream with oneAtATime false --- .../apache/spark/streaming/InputStreamsSuite.scala | 92 +++++++++++++++++++++- 1 file changed, 91 insertions(+), 1 deletion(-) (limited to 'streaming/src/test') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index cd0aa4d0dc..cc4a65011d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -29,7 +29,7 @@ import java.nio.charset.Charset import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue} import com.google.common.io.Files import org.scalatest.BeforeAndAfter @@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import org.apache.spark.streaming.receiver.{ActorHelper, Receiver} +import org.apache.spark.rdd.RDD class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -234,6 +235,95 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { logInfo("--------------------------------") assert(output.sum === numTotalRecords) } + + test("queue input stream - oneAtATime=true") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val queue = new SynchronizedQueue[RDD[String]]() + val queueStream = ssc.queueStream(queue, oneAtATime = true) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(queueStream, outputBuffer) + def output = outputBuffer.filter(_.size > 0) + outputStream.register() + ssc.start() + + // Setup data queued into the stream + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq("1", "2", "3", "4", "5") + val expectedOutput = input.map(Seq(_)) + //Thread.sleep(1000) + val inputIterator = input.toIterator + for (i <- 0 until input.size) { + // Enqueue more than 1 item per tick but they should dequeue one at a time + inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(1000) + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) + } + } + + test("queue input stream - oneAtATime=false") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val queue = new SynchronizedQueue[RDD[String]]() + val queueStream = ssc.queueStream(queue, oneAtATime = false) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(queueStream, outputBuffer) + def output = outputBuffer.filter(_.size > 0) + outputStream.register() + ssc.start() + + // Setup data queued into the stream + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq("1", "2", "3", "4", "5") + val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5")) + + // Enqueue the first 3 items (one by one), they should be merged in the next batch + val inputIterator = input.toIterator + inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + clock.addToTime(batchDuration.milliseconds) + Thread.sleep(1000) + + // Enqueue the remaining items (again one by one), merged in the final batch + inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + clock.addToTime(batchDuration.milliseconds) + Thread.sleep(1000) + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) + } + } } -- cgit v1.2.3