diff options
author | Manuel Laflamme <manuel.laflamme@gmail.com> | 2014-07-09 10:45:45 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-07-09 10:45:45 -0700 |
commit | 0eb11527d13083ced215e3fda44ed849198a57cb (patch) | |
tree | 12d1ee877f4bbaf76333d44c0e9fbd853f52e9d0 /streaming/src/main | |
parent | 339441f545ca96de4a7d6271e32f16803539a8c8 (diff) | |
download | spark-0eb11527d13083ced215e3fda44ed849198a57cb.tar.gz spark-0eb11527d13083ced215e3fda44ed849198a57cb.tar.bz2 spark-0eb11527d13083ced215e3fda44ed849198a57cb.zip |
[STREAMING] SPARK-2343: Fix QueueInputDStream with oneAtATime false
Fix QueueInputDStream which was not removing dequeued items when used with the oneAtATime flag disabled.
Author: Manuel Laflamme <manuel.laflamme@gmail.com>
Closes #1285 from mlaflamm/spark-2343 and squashes the following commits:
61c9e38 [Manuel Laflamme] Unit tests for queue input stream
c51d029 [Manuel Laflamme] Fix QueueInputDStream with oneAtATime false
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 6376cff78b..ed7da6dc13 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -41,7 +41,7 @@ class QueueInputDStream[T: ClassTag]( if (oneAtATime && queue.size > 0) { buffer += queue.dequeue() } else { - buffer ++= queue + buffer ++= queue.dequeueAll(_ => true) } if (buffer.size > 0) { if (oneAtATime) { |