aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-02-09 11:23:29 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 11:23:29 +0000
commit68ed3632c56389ab3ff4ea5d73c575f224dab4f6 (patch)
tree44c60c327728148e8eaa307170a8dfc6554372ac /examples
parente30121afac35439be5d42c04da6f047f7d973dd6 (diff)
downloadspark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.gz
spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.bz2
spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.zip
[SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated
Replace SynchronizeQueue with synchronized access to a Queue Author: Sean Owen <sowen@cloudera.com> Closes #11111 from srowen/SPARK-13170.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala8
1 files changed, 5 insertions, 3 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
index 13ba9a43ec..5455aed220 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.examples.streaming
-import scala.collection.mutable.SynchronizedQueue
+import scala.collection.mutable.Queue
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
@@ -34,7 +34,7 @@ object QueueStream {
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
- val rddQueue = new SynchronizedQueue[RDD[Int]]()
+ val rddQueue = new Queue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
val inputStream = ssc.queueStream(rddQueue)
@@ -45,7 +45,9 @@ object QueueStream {
// Create and push some RDDs into
for (i <- 1 to 30) {
- rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
+ rddQueue.synchronized {
+ rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
+ }
Thread.sleep(1000)
}
ssc.stop()