aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-12-03 12:00:09 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-12-03 12:00:09 -0800
commita02d47277379e1e82d0ee41b2205434f9ffbc3e5 (patch)
tree96dc36ab60130790793de3c024050289705323ed /streaming
parentad7cea6f776a39801d6bb5bb829d1800b175b2ab (diff)
downloadspark-a02d47277379e1e82d0ee41b2205434f9ffbc3e5.tar.gz
spark-a02d47277379e1e82d0ee41b2205434f9ffbc3e5.tar.bz2
spark-a02d47277379e1e82d0ee41b2205434f9ffbc3e5.zip
[FLAKY-TEST-FIX][STREAMING][TEST] Make sure StreamingContexts are shutdown after test
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #10124 from tdas/InputStreamSuite-flaky-test.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala122
1 files changed, 61 insertions, 61 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 047e38ef90..3a3176b91b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -206,28 +206,28 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val numTotalRecords = numThreads * numRecordsPerThread
val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread)
MultiThreadTestReceiver.haveAllThreadsFinished = false
-
- // set up the network stream using the test receiver
- val ssc = new StreamingContext(conf, batchDuration)
- val networkStream = ssc.receiverStream[Int](testReceiver)
- val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
- val outputStream = new TestOutputStream(countStream, outputBuffer)
def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
- outputStream.register()
- ssc.start()
-
- // Let the data from the receiver be received
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val startTime = System.currentTimeMillis()
- while((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
- System.currentTimeMillis() - startTime < 5000) {
- Thread.sleep(100)
- clock.advance(batchDuration.milliseconds)
+
+ // set up the network stream using the test receiver
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val networkStream = ssc.receiverStream[Int](testReceiver)
+ val countStream = networkStream.count
+
+ val outputStream = new TestOutputStream(countStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Let the data from the receiver be received
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val startTime = System.currentTimeMillis()
+ while ((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
+ System.currentTimeMillis() - startTime < 5000) {
+ Thread.sleep(100)
+ clock.advance(batchDuration.milliseconds)
+ }
+ Thread.sleep(1000)
}
- Thread.sleep(1000)
- logInfo("Stopping context")
- ssc.stop()
// Verify whether data received was as expected
logInfo("--------------------------------")
@@ -239,30 +239,30 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("queue input stream - oneAtATime = true") {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val queue = new SynchronizedQueue[RDD[String]]()
- val queueStream = ssc.queueStream(queue, oneAtATime = true)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(queueStream, outputBuffer)
- def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
- outputStream.register()
- ssc.start()
-
- // Setup data queued into the stream
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
- val inputIterator = input.toIterator
- for (i <- 0 until input.size) {
- // Enqueue more than 1 item per tick but they should dequeue one at a time
- inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.advance(batchDuration.milliseconds)
+ // Set up the streaming context and input streams
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val queue = new SynchronizedQueue[RDD[String]]()
+ val queueStream = ssc.queueStream(queue, oneAtATime = true)
+ val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Setup data queued into the stream
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+
+ val inputIterator = input.toIterator
+ for (i <- 0 until input.size) {
+ // Enqueue more than 1 item per tick but they should dequeue one at a time
+ inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.advance(batchDuration.milliseconds)
+ }
+ Thread.sleep(1000)
}
- Thread.sleep(1000)
- logInfo("Stopping context")
- ssc.stop()
// Verify whether data received was as expected
logInfo("--------------------------------")
@@ -282,33 +282,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("queue input stream - oneAtATime = false") {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val queue = new SynchronizedQueue[RDD[String]]()
- val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(queueStream, outputBuffer)
def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
- outputStream.register()
- ssc.start()
-
- // Setup data queued into the stream
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
- // Enqueue the first 3 items (one by one), they should be merged in the next batch
- val inputIterator = input.toIterator
- inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.advance(batchDuration.milliseconds)
- Thread.sleep(1000)
-
- // Enqueue the remaining items (again one by one), merged in the final batch
- inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
- clock.advance(batchDuration.milliseconds)
- Thread.sleep(1000)
- logInfo("Stopping context")
- ssc.stop()
+ // Set up the streaming context and input streams
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val queue = new SynchronizedQueue[RDD[String]]()
+ val queueStream = ssc.queueStream(queue, oneAtATime = false)
+ val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Setup data queued into the stream
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+
+ // Enqueue the first 3 items (one by one), they should be merged in the next batch
+ val inputIterator = input.toIterator
+ inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.advance(batchDuration.milliseconds)
+ Thread.sleep(1000)
+
+ // Enqueue the remaining items (again one by one), merged in the final batch
+ inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.advance(batchDuration.milliseconds)
+ Thread.sleep(1000)
+ }
// Verify whether data received was as expected
logInfo("--------------------------------")