aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-02-09 11:23:29 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 11:23:29 +0000
commit68ed3632c56389ab3ff4ea5d73c575f224dab4f6 (patch)
tree44c60c327728148e8eaa307170a8dfc6554372ac /streaming/src/test
parente30121afac35439be5d42c04da6f047f7d973dd6 (diff)
downloadspark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.gz
spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.tar.bz2
spark-68ed3632c56389ab3ff4ea5d73c575f224dab4f6.zip
[SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is deprecated
Replace SynchronizeQueue with synchronized access to a Queue Author: Sean Owen <sowen@cloudera.com> Closes #11111 from srowen/SPARK-13170.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala37
1 files changed, 24 insertions, 13 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 93c883362c..fa17b3a15c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -24,7 +24,7 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
-import scala.collection.mutable.SynchronizedQueue
+import scala.collection.mutable
import scala.language.postfixOps
import com.google.common.io.Files
@@ -40,7 +40,6 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
import org.apache.spark.util.{ManualClock, Utils}
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -67,7 +66,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Feed data to the server to send to the network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val expectedOutput = input.map(_.toString)
- for (i <- 0 until input.size) {
+ for (i <- input.indices) {
testServer.send(input(i).toString + "\n")
Thread.sleep(500)
clock.advance(batchDuration.milliseconds)
@@ -102,8 +101,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
+ assert(output.length === expectedOutput.size)
+ for (i <- output.indices) {
assert(output(i) === expectedOutput(i))
}
}
@@ -242,11 +241,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
- def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
+ def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)
// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
- val queue = new SynchronizedQueue[RDD[String]]()
+ val queue = new mutable.Queue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = true)
val outputStream = new TestOutputStream(queueStream, outputQueue)
outputStream.register()
@@ -256,9 +255,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val inputIterator = input.toIterator
- for (i <- 0 until input.size) {
+ for (i <- input.indices) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
- inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ inputIterator.take(2).foreach { i =>
+ queue.synchronized {
+ queue += ssc.sparkContext.makeRDD(Seq(i))
+ }
+ }
clock.advance(batchDuration.milliseconds)
}
Thread.sleep(1000)
@@ -281,13 +284,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("queue input stream - oneAtATime = false") {
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
- def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
+ def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
- val queue = new SynchronizedQueue[RDD[String]]()
+ val queue = new mutable.Queue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputStream = new TestOutputStream(queueStream, outputQueue)
outputStream.register()
@@ -298,12 +301,20 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Enqueue the first 3 items (one by one), they should be merged in the next batch
val inputIterator = input.toIterator
- inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ inputIterator.take(3).foreach { i =>
+ queue.synchronized {
+ queue += ssc.sparkContext.makeRDD(Seq(i))
+ }
+ }
clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)
// Enqueue the remaining items (again one by one), merged in the final batch
- inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ inputIterator.foreach { i =>
+ queue.synchronized {
+ queue += ssc.sparkContext.makeRDD(Seq(i))
+ }
+ }
clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)
}