aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-02-09 11:23:29 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 11:23:29 +0000
commit68ed3632c56389ab3ff4ea5d73c575f224dab4f6 (patch)
tree44c60c327728148e8eaa307170a8dfc6554372ac /streaming
parente30121afac35439be5d42c04da6f047f7d973dd6 (diff)
downloadspark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.gz
spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.bz2
spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.zip
[SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated
Replace SynchronizeQueue with synchronized access to a Queue Author: Sean Owen <sowen@cloudera.com> Closes #11111 from srowen/SPARK-13170.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala37
3 files changed, 34 insertions, 20 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 32bea88ec6..a1b25c9f7d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -459,7 +459,7 @@ class StreamingContext private[streaming] (
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
*
- * @param queue Queue of RDDs
+ * @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
@@ -477,7 +477,7 @@ class StreamingContext private[streaming] (
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
*
- * @param queue Queue of RDDs
+ * @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty.
* Set as null if no RDD should be returned when empty
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index a8d108de6c..f9c7869916 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -48,12 +48,15 @@ class QueueInputDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[T]] = {
val buffer = new ArrayBuffer[RDD[T]]()
- if (oneAtATime && queue.size > 0) {
- buffer += queue.dequeue()
- } else {
- buffer ++= queue.dequeueAll(_ => true)
+ queue.synchronized {
+ if (oneAtATime && queue.nonEmpty) {
+ buffer += queue.dequeue()
+ } else {
+ buffer ++= queue
+ queue.clear()
+ }
}
- if (buffer.size > 0) {
+ if (buffer.nonEmpty) {
if (oneAtATime) {
Some(buffer.head)
} else {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 93c883362c..fa17b3a15c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -24,7 +24,7 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
-import scala.collection.mutable.SynchronizedQueue
+import scala.collection.mutable
import scala.language.postfixOps
import com.google.common.io.Files
@@ -40,7 +40,6 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
import org.apache.spark.util.{ManualClock, Utils}
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -67,7 +66,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Feed data to the server to send to the network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val expectedOutput = input.map(_.toString)
- for (i <- 0 until input.size) {
+ for (i <- input.indices) {
testServer.send(input(i).toString + "\n")
Thread.sleep(500)
clock.advance(batchDuration.milliseconds)
@@ -102,8 +101,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
+ assert(output.length === expectedOutput.size)
+ for (i <- output.indices) {
assert(output(i) === expectedOutput(i))
}
}
@@ -242,11 +241,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
- def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
+ def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)
// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
- val queue = new SynchronizedQueue[RDD[String]]()
+ val queue = new mutable.Queue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = true)
val outputStream = new TestOutputStream(queueStream, outputQueue)
outputStream.register()
@@ -256,9 +255,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val inputIterator = input.toIterator
- for (i <- 0 until input.size) {
+ for (i <- input.indices) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
- inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ inputIterator.take(2).foreach { i =>
+ queue.synchronized {
+ queue += ssc.sparkContext.makeRDD(Seq(i))
+ }
+ }
clock.advance(batchDuration.milliseconds)
}
Thread.sleep(1000)
@@ -281,13 +284,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("queue input stream - oneAtATime = false") {
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
- def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
+ def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
- val queue = new SynchronizedQueue[RDD[String]]()
+ val queue = new mutable.Queue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputStream = new TestOutputStream(queueStream, outputQueue)
outputStream.register()
@@ -298,12 +301,20 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Enqueue the first 3 items (one by one), they should be merged in the next batch
val inputIterator = input.toIterator
- inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ inputIterator.take(3).foreach { i =>
+ queue.synchronized {
+ queue += ssc.sparkContext.makeRDD(Seq(i))
+ }
+ }
clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)
// Enqueue the remaining items (again one by one), merged in the final batch
- inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ inputIterator.foreach { i =>
+ queue.synchronized {
+ queue += ssc.sparkContext.makeRDD(Seq(i))
+ }
+ }
clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)
}