aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala16
3 files changed, 33 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 25842d5025..624a31ddc2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.collection.Map
import scala.collection.mutable.Queue
import scala.reflect.ClassTag
+import scala.util.control.NonFatal
import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
@@ -576,18 +577,26 @@ class StreamingContext private[streaming] (
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
- validate()
startSite.set(DStream.getCreationSite())
sparkContext.setCallSite(startSite.get)
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
- scheduler.start()
- uiTab.foreach(_.attach())
- state = StreamingContextState.ACTIVE
+ try {
+ validate()
+ scheduler.start()
+ state = StreamingContextState.ACTIVE
+ } catch {
+ case NonFatal(e) =>
+ logError("Error starting the context, marking it as stopped", e)
+ scheduler.stop(false)
+ state = StreamingContextState.STOPPED
+ throw e
+ }
StreamingContext.setActiveContext(this)
}
shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
+ uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 1d1ddaaccf..4af9b6d3b5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -126,6 +126,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
eventLoop.post(ErrorReported(msg, e))
}
+ def isStarted(): Boolean = synchronized {
+ eventLoop != null
+ }
+
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index d304c9a732..819dd2ccfe 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -151,6 +151,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(StreamingContext.getActive().isEmpty)
}
+ test("start failure should stop internal components") {
+ ssc = new StreamingContext(conf, batchDuration)
+ val inputStream = addInputStream(ssc)
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some(values.sum + state.getOrElse(0))
+ }
+ inputStream.map(x => (x, 1)).updateStateByKey[Int](updateFunc)
+ // Require that the start fails because checkpoint directory was not set
+ intercept[Exception] {
+ ssc.start()
+ }
+ assert(ssc.getState() === StreamingContextState.STOPPED)
+ assert(ssc.scheduler.isStarted === false)
+ }
+
+
test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()