aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-02-09 11:23:29 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 11:23:29 +0000
commit68ed3632c56389ab3ff4ea5d73c575f224dab4f6 (patch)
tree44c60c327728148e8eaa307170a8dfc6554372ac /streaming/src/main
parente30121afac35439be5d42c04da6f047f7d973dd6 (diff)
downloadspark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.gz
spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.bz2
spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.zip
[SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated
Replace SynchronizeQueue with synchronized access to a Queue Author: Sean Owen <sowen@cloudera.com> Closes #11111 from srowen/SPARK-13170.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala13
2 files changed, 10 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 32bea88ec6..a1b25c9f7d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -459,7 +459,7 @@ class StreamingContext private[streaming] (
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
*
- * @param queue Queue of RDDs
+ * @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
@@ -477,7 +477,7 @@ class StreamingContext private[streaming] (
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
*
- * @param queue Queue of RDDs
+ * @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty.
* Set as null if no RDD should be returned when empty
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index a8d108de6c..f9c7869916 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -48,12 +48,15 @@ class QueueInputDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[T]] = {
val buffer = new ArrayBuffer[RDD[T]]()
- if (oneAtATime && queue.size > 0) {
- buffer += queue.dequeue()
- } else {
- buffer ++= queue.dequeueAll(_ => true)
+ queue.synchronized {
+ if (oneAtATime && queue.nonEmpty) {
+ buffer += queue.dequeue()
+ } else {
+ buffer ++= queue
+ queue.clear()
+ }
}
- if (buffer.size > 0) {
+ if (buffer.nonEmpty) {
if (oneAtATime) {
Some(buffer.head)
} else {