diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-06-01 20:04:57 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-06-01 20:04:57 -0700 |
commit | 2f9c7519d6a3f867100979b5e7ced3f72b7d9adc (patch) | |
tree | 8e0888eb9ef1a4b9fccfddb3752f7251bcddde0c /streaming/src | |
parent | 90c606925e7ec8f65f28e2290a0048f64af8c6a6 (diff) | |
download | spark-2f9c7519d6a3f867100979b5e7ced3f72b7d9adc.tar.gz spark-2f9c7519d6a3f867100979b5e7ced3f72b7d9adc.tar.bz2 spark-2f9c7519d6a3f867100979b5e7ced3f72b7d9adc.zip |
[SPARK-7958] [STREAMING] Handled exception in StreamingContext.start() to prevent leaking of actors
StreamingContext.start() can throw exception because DStream.validateAtStart() fails (say, checkpoint directory not set for StateDStream). But by then JobScheduler, JobGenerator, and ReceiverTracker has already started, along with their actors. But those cannot be shutdown because the only way to do that is call StreamingContext.stop() which cannot be called as the context has not been marked as ACTIVE.
The solution in this PR is to stop the internal scheduler if start throw exception, and mark the context as STOPPED.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #6559 from tdas/SPARK-7958 and squashes the following commits:
20b2ec1 [Tathagata Das] Added synchronized
790b617 [Tathagata Das] Handled exception in StreamingContext.start()
Diffstat (limited to 'streaming/src')
3 files changed, 33 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 25842d5025..624a31ddc2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import scala.collection.Map import scala.collection.mutable.Queue import scala.reflect.ClassTag +import scala.util.control.NonFatal import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration @@ -576,18 +577,26 @@ class StreamingContext private[streaming] ( def start(): Unit = synchronized { state match { case INITIALIZED => - validate() startSite.set(DStream.getCreationSite()) sparkContext.setCallSite(startSite.get) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() - scheduler.start() - uiTab.foreach(_.attach()) - state = StreamingContextState.ACTIVE + try { + validate() + scheduler.start() + state = StreamingContextState.ACTIVE + } catch { + case NonFatal(e) => + logError("Error starting the context, marking it as stopped", e) + scheduler.stop(false) + state = StreamingContextState.STOPPED + throw e + } StreamingContext.setActiveContext(this) } shutdownHookRef = Utils.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 1d1ddaaccf..4af9b6d3b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -126,6 +126,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { eventLoop.post(ErrorReported(msg, e)) } + def isStarted(): Boolean = synchronized { + eventLoop != null + } + private def processEvent(event: JobSchedulerEvent) { try { event match { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index d304c9a732..819dd2ccfe 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -151,6 +151,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo assert(StreamingContext.getActive().isEmpty) } + test("start failure should stop internal components") { + ssc = new StreamingContext(conf, batchDuration) + val inputStream = addInputStream(ssc) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.sum + state.getOrElse(0)) + } + inputStream.map(x => (x, 1)).updateStateByKey[Int](updateFunc) + // Require that the start fails because checkpoint directory was not set + intercept[Exception] { + ssc.start() + } + assert(ssc.getState() === StreamingContextState.STOPPED) + assert(ssc.scheduler.isStarted === false) + } + + test("start multiple times") { ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register() |