diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-08 00:00:17 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-08 00:00:17 -0700 |
commit | 83ac9a4bbf272028d0c4639cbd1e12022b9ae77a (patch) | |
tree | 00a8c8f581e2f681463126a7e8a34993a790c692 /streaming/src/test | |
parent | 11eabbe125b2ee572fad359c33c93f5e6fdf0b2d (diff) | |
download | spark-83ac9a4bbf272028d0c4639cbd1e12022b9ae77a.tar.gz spark-83ac9a4bbf272028d0c4639cbd1e12022b9ae77a.tar.bz2 spark-83ac9a4bbf272028d0c4639cbd1e12022b9ae77a.zip |
[SPARK-1331] Added graceful shutdown to Spark Streaming
Current version of StreamingContext.stop() directly kills all the data receivers (NetworkReceiver) without waiting for the data already received to be persisted and processed. This PR provides the fix. Now, when the StreamingContext.stop() is called, the following sequence of steps will happen.
1. The driver will send a stop signal to all the active receivers.
2. Each receiver, when it gets a stop signal from the driver, first stop receiving more data, then waits for the thread that persists data blocks to BlockManager to finish persisting all receive data, and finally quits.
3. After all the receivers have stopped, the driver will wait for the Job Generator and Job Scheduler to finish processing all the received data.
It also fixes the semantics of StreamingContext.start and stop. It will throw appropriate errors and warnings if stop() is called before start(), stop() is called twice, etc.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #247 from tdas/graceful-shutdown and squashes the following commits:
61c0016 [Tathagata Das] Updated MIMA binary check excludes.
ae1d39b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into graceful-shutdown
6b59cfc [Tathagata Das] Minor changes based on Andrew's comment on PR.
d0b8d65 [Tathagata Das] Reduced time taken by graceful shutdown unit test.
f55bc67 [Tathagata Das] Fix scalastyle
c69b3a7 [Tathagata Das] Updates based on Patrick's comments.
c43b8ae [Tathagata Das] Added graceful shutdown to Spark Streaming.
Diffstat (limited to 'streaming/src/test')
3 files changed, 98 insertions, 16 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index bcb0c28bf0..bb73dbf29b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -324,7 +324,7 @@ class BasicOperationsSuite extends TestSuiteBase { val updateStateOperation = (s: DStream[String]) => { val updateFunc = (values: Seq[Int], state: Option[Int]) => { - Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) + Some(values.sum + state.getOrElse(0)) } s.map(x => (x, 1)).updateStateByKey[Int](updateFunc) } @@ -359,7 +359,7 @@ class BasicOperationsSuite extends TestSuiteBase { // updateFunc clears a state when a StateObject is seen without new values twice in a row val updateFunc = (values: Seq[Int], state: Option[StateObject]) => { val stateObj = state.getOrElse(new StateObject) - values.foldLeft(0)(_ + _) match { + values.sum match { case 0 => stateObj.expireCounter += 1 // no new values case n => { // has new values, increment and reset expireCounter stateObj.counter += n diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 717da8e004..9cc27ef7f0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -17,19 +17,22 @@ package org.apache.spark.streaming -import org.scalatest.{FunSuite, BeforeAndAfter} -import org.scalatest.exceptions.TestFailedDueToTimeoutException +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.{DStream, NetworkReceiver} +import org.apache.spark.util.{MetadataCleaner, Utils} +import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Timeouts +import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkException, SparkConf, SparkContext} -import org.apache.spark.util.{Utils, MetadataCleaner} -import org.apache.spark.streaming.dstream.DStream -class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { +class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging { val master = "local[2]" val appName = this.getClass.getSimpleName - val batchDuration = Seconds(1) + val batchDuration = Milliseconds(500) val sparkHome = "someDir" val envPair = "key" -> "value" val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100 @@ -108,19 +111,31 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) myConf.set("spark.cleaner.ttl", ttl.toString) val ssc1 = new StreamingContext(myConf, batchDuration) + addInputStream(ssc1).register + ssc1.start() val cp = new Checkpoint(ssc1, Time(1000)) assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl) ssc1.stop() val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl) - ssc = new StreamingContext(null, cp, null) + ssc = new StreamingContext(null, newCp, null) assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl) } - test("start multiple times") { + test("start and stop state check") { ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register + assert(ssc.state === ssc.StreamingContextState.Initialized) + ssc.start() + assert(ssc.state === ssc.StreamingContextState.Started) + ssc.stop() + assert(ssc.state === ssc.StreamingContextState.Stopped) + } + + test("start multiple times") { + ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register ssc.start() intercept[SparkException] { ssc.start() @@ -133,18 +148,61 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc.start() ssc.stop() ssc.stop() - ssc = null } + test("stop before start and start after stop") { + ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register + ssc.stop() // stop before start should not throw exception + ssc.start() + ssc.stop() + intercept[SparkException] { + ssc.start() // start after stop should throw exception + } + } + + test("stop only streaming context") { ssc = new StreamingContext(master, appName, batchDuration) sc = ssc.sparkContext addInputStream(ssc).register ssc.start() ssc.stop(false) - ssc = null assert(sc.makeRDD(1 to 100).collect().size === 100) ssc = new StreamingContext(sc, batchDuration) + addInputStream(ssc).register + ssc.start() + ssc.stop() + } + + test("stop gracefully") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + conf.set("spark.cleaner.ttl", "3600") + sc = new SparkContext(conf) + for (i <- 1 to 4) { + logInfo("==================================") + ssc = new StreamingContext(sc, batchDuration) + var runningCount = 0 + TestReceiver.counter.set(1) + val input = ssc.networkStream(new TestReceiver) + input.count.foreachRDD(rdd => { + val count = rdd.first() + logInfo("Count = " + count) + runningCount += count.toInt + }) + ssc.start() + ssc.awaitTermination(500) + ssc.stop(stopSparkContext = false, stopGracefully = true) + logInfo("Running count = " + runningCount) + logInfo("TestReceiver.counter = " + TestReceiver.counter.get()) + assert(runningCount > 0) + assert( + (TestReceiver.counter.get() == runningCount + 1) || + (TestReceiver.counter.get() == runningCount + 2), + "Received records = " + TestReceiver.counter.get() + ", " + + "processed records = " + runningCount + ) + } } test("awaitTermination") { @@ -199,7 +257,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { test("awaitTermination with error in job generation") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) - inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register val exception = intercept[TestException] { ssc.start() @@ -215,4 +272,29 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { } } -class TestException(msg: String) extends Exception(msg)
\ No newline at end of file +class TestException(msg: String) extends Exception(msg) + +/** Custom receiver for testing whether all data received by a receiver gets processed or not */ +class TestReceiver extends NetworkReceiver[Int] { + protected lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY) + protected def onStart() { + blockGenerator.start() + logInfo("BlockGenerator started on thread " + receivingThread) + try { + while(true) { + blockGenerator += TestReceiver.counter.getAndIncrement + Thread.sleep(0) + } + } finally { + logInfo("Receiving stopped at count value of " + TestReceiver.counter.get()) + } + } + + protected def onStop() { + blockGenerator.stop() + } +} + +object TestReceiver { + val counter = new AtomicInteger(1) +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 201630672a..aa2d5c2fc2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -277,7 +277,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") - Thread.sleep(500) // Give some time for the forgetting old RDDs to complete + Thread.sleep(100) // Give some time for the forgetting old RDDs to complete } catch { case e: Exception => {e.printStackTrace(); throw e} } finally { |