diff options
author | Sean Owen <sowen@cloudera.com> | 2016-02-09 11:23:29 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-02-09 11:23:29 +0000 |
commit | 68ed3632c56389ab3ff4ea5d73c575f224dab4f6 (patch) | |
tree | 44c60c327728148e8eaa307170a8dfc6554372ac /streaming/src/main | |
parent | e30121afac35439be5d42c04da6f047f7d973dd6 (diff) | |
download | spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.gz spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.bz2 spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.zip |
[SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated
Replace SynchronizeQueue with synchronized access to a Queue
Author: Sean Owen <sowen@cloudera.com>
Closes #11111 from srowen/SPARK-13170.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 4 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala | 13 |
2 files changed, 10 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 32bea88ec6..a1b25c9f7d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -459,7 +459,7 @@ class StreamingContext private[streaming] ( * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of * those RDDs, so `queueStream` doesn't support checkpointing. * - * @param queue Queue of RDDs + * @param queue Queue of RDDs. Modifications to this data structure must be synchronized. * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval * @tparam T Type of objects in the RDD */ @@ -477,7 +477,7 @@ class StreamingContext private[streaming] ( * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of * those RDDs, so `queueStream` doesn't support checkpointing. * - * @param queue Queue of RDDs + * @param queue Queue of RDDs. Modifications to this data structure must be synchronized. * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. * Set as null if no RDD should be returned when empty diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index a8d108de6c..f9c7869916 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -48,12 +48,15 @@ class QueueInputDStream[T: ClassTag]( override def compute(validTime: Time): Option[RDD[T]] = { val buffer = new ArrayBuffer[RDD[T]]() - if (oneAtATime && queue.size > 0) { - buffer += queue.dequeue() - } else { - buffer ++= queue.dequeueAll(_ => true) + queue.synchronized { + if (oneAtATime && queue.nonEmpty) { + buffer += queue.dequeue() + } else { + buffer ++= queue + queue.clear() + } } - if (buffer.size > 0) { + if (buffer.nonEmpty) { if (oneAtATime) { Some(buffer.head) } else { |