aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorManuel Laflamme <manuel.laflamme@gmail.com>2014-07-09 10:45:45 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-09 10:45:45 -0700
commit0eb11527d13083ced215e3fda44ed849198a57cb (patch)
tree12d1ee877f4bbaf76333d44c0e9fbd853f52e9d0 /streaming
parent339441f545ca96de4a7d6271e32f16803539a8c8 (diff)
downloadspark-0eb11527d13083ced215e3fda44ed849198a57cb.tar.gz
spark-0eb11527d13083ced215e3fda44ed849198a57cb.tar.bz2
spark-0eb11527d13083ced215e3fda44ed849198a57cb.zip
[STREAMING] SPARK-2343: Fix QueueInputDStream with oneAtATime false
Fix QueueInputDStream which was not removing dequeued items when used with the oneAtATime flag disabled. Author: Manuel Laflamme <manuel.laflamme@gmail.com> Closes #1285 from mlaflamm/spark-2343 and squashes the following commits: 61c9e38 [Manuel Laflamme] Unit tests for queue input stream c51d029 [Manuel Laflamme] Fix QueueInputDStream with oneAtATime false
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala92
2 files changed, 92 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 6376cff78b..ed7da6dc13 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -41,7 +41,7 @@ class QueueInputDStream[T: ClassTag](
if (oneAtATime && queue.size > 0) {
buffer += queue.dequeue()
} else {
- buffer ++= queue
+ buffer ++= queue.dequeueAll(_ => true)
}
if (buffer.size > 0) {
if (oneAtATime) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index cd0aa4d0dc..cc4a65011d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -29,7 +29,7 @@ import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
import com.google.common.io.Files
import org.scalatest.BeforeAndAfter
@@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
+import org.apache.spark.rdd.RDD
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -234,6 +235,95 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("--------------------------------")
assert(output.sum === numTotalRecords)
}
+
+ test("queue input stream - oneAtATime=true") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val queue = new SynchronizedQueue[RDD[String]]()
+ val queueStream = ssc.queueStream(queue, oneAtATime = true)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ def output = outputBuffer.filter(_.size > 0)
+ outputStream.register()
+ ssc.start()
+
+ // Setup data queued into the stream
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq("1", "2", "3", "4", "5")
+ val expectedOutput = input.map(Seq(_))
+ //Thread.sleep(1000)
+ val inputIterator = input.toIterator
+ for (i <- 0 until input.size) {
+ // Enqueue more than 1 item per tick but they should dequeue one at a time
+ inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(1000)
+ logInfo("Stopping context")
+ ssc.stop()
+
+ // Verify whether data received was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputBuffer.size)
+ logInfo("output")
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i) === expectedOutput(i))
+ }
+ }
+
+ test("queue input stream - oneAtATime=false") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val queue = new SynchronizedQueue[RDD[String]]()
+ val queueStream = ssc.queueStream(queue, oneAtATime = false)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ def output = outputBuffer.filter(_.size > 0)
+ outputStream.register()
+ ssc.start()
+
+ // Setup data queued into the stream
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq("1", "2", "3", "4", "5")
+ val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
+
+ // Enqueue the first 3 items (one by one), they should be merged in the next batch
+ val inputIterator = input.toIterator
+ inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.addToTime(batchDuration.milliseconds)
+ Thread.sleep(1000)
+
+ // Enqueue the remaining items (again one by one), merged in the final batch
+ inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.addToTime(batchDuration.milliseconds)
+ Thread.sleep(1000)
+ logInfo("Stopping context")
+ ssc.stop()
+
+ // Verify whether data received was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputBuffer.size)
+ logInfo("output")
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i) === expectedOutput(i))
+ }
+ }
}