aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala8
1 files changed, 5 insertions, 3 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
index 13ba9a43ec..5455aed220 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.examples.streaming
-import scala.collection.mutable.SynchronizedQueue
+import scala.collection.mutable.Queue
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
@@ -34,7 +34,7 @@ object QueueStream {
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
- val rddQueue = new SynchronizedQueue[RDD[Int]]()
+ val rddQueue = new Queue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
val inputStream = ssc.queueStream(rddQueue)
@@ -45,7 +45,9 @@ object QueueStream {
// Create and push some RDDs into
for (i <- 1 to 30) {
- rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
+ rddQueue.synchronized {
+ rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
+ }
Thread.sleep(1000)
}
ssc.stop()