diff options
author | Manuel Laflamme <manuel.laflamme@gmail.com> | 2014-07-09 10:45:45 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-07-09 10:45:45 -0700 |
commit | 0eb11527d13083ced215e3fda44ed849198a57cb (patch) | |
tree | 12d1ee877f4bbaf76333d44c0e9fbd853f52e9d0 /streaming/src | |
parent | 339441f545ca96de4a7d6271e32f16803539a8c8 (diff) | |
download | spark-0eb11527d13083ced215e3fda44ed849198a57cb.tar.gz spark-0eb11527d13083ced215e3fda44ed849198a57cb.tar.bz2 spark-0eb11527d13083ced215e3fda44ed849198a57cb.zip |
[STREAMING] SPARK-2343: Fix QueueInputDStream with oneAtATime false
Fix QueueInputDStream which was not removing dequeued items when used with the oneAtATime flag disabled.
Author: Manuel Laflamme <manuel.laflamme@gmail.com>
Closes #1285 from mlaflamm/spark-2343 and squashes the following commits:
61c9e38 [Manuel Laflamme] Unit tests for queue input stream
c51d029 [Manuel Laflamme] Fix QueueInputDStream with oneAtATime false
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala | 2 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala | 92 |
2 files changed, 92 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 6376cff78b..ed7da6dc13 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -41,7 +41,7 @@ class QueueInputDStream[T: ClassTag]( if (oneAtATime && queue.size > 0) { buffer += queue.dequeue() } else { - buffer ++= queue + buffer ++= queue.dequeueAll(_ => true) } if (buffer.size > 0) { if (oneAtATime) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index cd0aa4d0dc..cc4a65011d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -29,7 +29,7 @@ import java.nio.charset.Charset import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue} import com.google.common.io.Files import org.scalatest.BeforeAndAfter @@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import org.apache.spark.streaming.receiver.{ActorHelper, Receiver} +import org.apache.spark.rdd.RDD class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -234,6 +235,95 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { logInfo("--------------------------------") assert(output.sum === numTotalRecords) } + + test("queue input stream - oneAtATime=true") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val queue = new SynchronizedQueue[RDD[String]]() + val queueStream = ssc.queueStream(queue, oneAtATime = true) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(queueStream, outputBuffer) + def output = outputBuffer.filter(_.size > 0) + outputStream.register() + ssc.start() + + // Setup data queued into the stream + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq("1", "2", "3", "4", "5") + val expectedOutput = input.map(Seq(_)) + //Thread.sleep(1000) + val inputIterator = input.toIterator + for (i <- 0 until input.size) { + // Enqueue more than 1 item per tick but they should dequeue one at a time + inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(1000) + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) + } + } + + test("queue input stream - oneAtATime=false") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val queue = new SynchronizedQueue[RDD[String]]() + val queueStream = ssc.queueStream(queue, oneAtATime = false) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(queueStream, outputBuffer) + def output = outputBuffer.filter(_.size > 0) + outputStream.register() + ssc.start() + + // Setup data queued into the stream + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq("1", "2", "3", "4", "5") + val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5")) + + // Enqueue the first 3 items (one by one), they should be merged in the next batch + val inputIterator = input.toIterator + inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + clock.addToTime(batchDuration.milliseconds) + Thread.sleep(1000) + + // Enqueue the remaining items (again one by one), merged in the final batch + inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i))) + clock.addToTime(batchDuration.milliseconds) + Thread.sleep(1000) + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) + } + } } |