From 83ac9a4bbf272028d0c4639cbd1e12022b9ae77a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 8 Apr 2014 00:00:17 -0700 Subject: [SPARK-1331] Added graceful shutdown to Spark Streaming Current version of StreamingContext.stop() directly kills all the data receivers (NetworkReceiver) without waiting for the data already received to be persisted and processed. This PR provides the fix. Now, when the StreamingContext.stop() is called, the following sequence of steps will happen. 1. The driver will send a stop signal to all the active receivers. 2. Each receiver, when it gets a stop signal from the driver, first stop receiving more data, then waits for the thread that persists data blocks to BlockManager to finish persisting all receive data, and finally quits. 3. After all the receivers have stopped, the driver will wait for the Job Generator and Job Scheduler to finish processing all the received data. It also fixes the semantics of StreamingContext.start and stop. It will throw appropriate errors and warnings if stop() is called before start(), stop() is called twice, etc. Author: Tathagata Das Closes #247 from tdas/graceful-shutdown and squashes the following commits: 61c0016 [Tathagata Das] Updated MIMA binary check excludes. ae1d39b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into graceful-shutdown 6b59cfc [Tathagata Das] Minor changes based on Andrew's comment on PR. d0b8d65 [Tathagata Das] Reduced time taken by graceful shutdown unit test. f55bc67 [Tathagata Das] Fix scalastyle c69b3a7 [Tathagata Das] Updates based on Patrick's comments. c43b8ae [Tathagata Das] Added graceful shutdown to Spark Streaming. --- project/MimaBuild.scala | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) (limited to 'project/MimaBuild.scala') diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala index e7c9c47c96..5ea4817bfd 100644 --- a/project/MimaBuild.scala +++ b/project/MimaBuild.scala @@ -58,17 +58,19 @@ object MimaBuild { SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.0") => Seq( - excludePackage("org.apache.spark.api.java"), - excludePackage("org.apache.spark.streaming.api.java"), - excludePackage("org.apache.spark.mllib") - ) ++ - excludeSparkClass("rdd.ClassTags") ++ - excludeSparkClass("util.XORShiftRandom") ++ - excludeSparkClass("mllib.recommendation.MFDataGenerator") ++ - excludeSparkClass("mllib.optimization.SquaredGradient") ++ - excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++ - excludeSparkClass("mllib.regression.LassoWithSGD") ++ - excludeSparkClass("mllib.regression.LinearRegressionWithSGD") + excludePackage("org.apache.spark.api.java"), + excludePackage("org.apache.spark.streaming.api.java"), + excludePackage("org.apache.spark.mllib") + ) ++ + excludeSparkClass("rdd.ClassTags") ++ + excludeSparkClass("util.XORShiftRandom") ++ + excludeSparkClass("mllib.recommendation.MFDataGenerator") ++ + excludeSparkClass("mllib.optimization.SquaredGradient") ++ + excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++ + excludeSparkClass("mllib.regression.LassoWithSGD") ++ + excludeSparkClass("mllib.regression.LinearRegressionWithSGD") ++ + excludeSparkClass("streaming.dstream.NetworkReceiver") ++ + excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor") case _ => Seq() } -- cgit v1.2.3