aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorManuel Laflamme <manuel.laflamme@gmail.com>2014-07-09 10:45:45 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-09 10:45:45 -0700
commit0eb11527d13083ced215e3fda44ed849198a57cb (patch)
tree12d1ee877f4bbaf76333d44c0e9fbd853f52e9d0 /streaming/src/main
parent339441f545ca96de4a7d6271e32f16803539a8c8 (diff)
downloadspark-0eb11527d13083ced215e3fda44ed849198a57cb.tar.gz
spark-0eb11527d13083ced215e3fda44ed849198a57cb.tar.bz2
spark-0eb11527d13083ced215e3fda44ed849198a57cb.zip
[STREAMING] SPARK-2343: Fix QueueInputDStream with oneAtATime false
Fix QueueInputDStream which was not removing dequeued items when used with the oneAtATime flag disabled. Author: Manuel Laflamme <manuel.laflamme@gmail.com> Closes #1285 from mlaflamm/spark-2343 and squashes the following commits: 61c9e38 [Manuel Laflamme] Unit tests for queue input stream c51d029 [Manuel Laflamme] Fix QueueInputDStream with oneAtATime false
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 6376cff78b..ed7da6dc13 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -41,7 +41,7 @@ class QueueInputDStream[T: ClassTag](
if (oneAtATime && queue.size > 0) {
buffer += queue.dequeue()
} else {
- buffer ++= queue
+ buffer ++= queue.dequeueAll(_ => true)
}
if (buffer.size > 0) {
if (oneAtATime) {