diff options
author | Sean Owen <sowen@cloudera.com> | 2016-02-09 11:23:29 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-02-09 11:23:29 +0000 |
commit | 68ed3632c56389ab3ff4ea5d73c575f224dab4f6 (patch) | |
tree | 44c60c327728148e8eaa307170a8dfc6554372ac /examples | |
parent | e30121afac35439be5d42c04da6f047f7d973dd6 (diff) | |
download | spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.gz spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.bz2 spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.zip |
[SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated
Replace SynchronizeQueue with synchronized access to a Queue
Author: Sean Owen <sowen@cloudera.com>
Closes #11111 from srowen/SPARK-13170.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala index 13ba9a43ec..5455aed220 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.examples.streaming -import scala.collection.mutable.SynchronizedQueue +import scala.collection.mutable.Queue import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD @@ -34,7 +34,7 @@ object QueueStream { // Create the queue through which RDDs can be pushed to // a QueueInputDStream - val rddQueue = new SynchronizedQueue[RDD[Int]]() + val rddQueue = new Queue[RDD[Int]]() // Create the QueueInputDStream and use it do some processing val inputStream = ssc.queueStream(rddQueue) @@ -45,7 +45,9 @@ object QueueStream { // Create and push some RDDs into for (i <- 1 to 30) { - rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10) + rddQueue.synchronized { + rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10) + } Thread.sleep(1000) } ssc.stop() |