aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-06-01 20:04:57 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-01 20:04:57 -0700
commit2f9c7519d6a3f867100979b5e7ced3f72b7d9adc (patch)
tree8e0888eb9ef1a4b9fccfddb3752f7251bcddde0c /streaming
parent90c606925e7ec8f65f28e2290a0048f64af8c6a6 (diff)
downloadspark-2f9c7519d6a3f867100979b5e7ced3f72b7d9adc.tar.gz
spark-2f9c7519d6a3f867100979b5e7ced3f72b7d9adc.tar.bz2
spark-2f9c7519d6a3f867100979b5e7ced3f72b7d9adc.zip
[SPARK-7958] [STREAMING] Handled exception in StreamingContext.start() to prevent leaking of actors
StreamingContext.start() can throw exception because DStream.validateAtStart() fails (say, checkpoint directory not set for StateDStream). But by then JobScheduler, JobGenerator, and ReceiverTracker has already started, along with their actors. But those cannot be shutdown because the only way to do that is call StreamingContext.stop() which cannot be called as the context has not been marked as ACTIVE. The solution in this PR is to stop the internal scheduler if start throw exception, and mark the context as STOPPED. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6559 from tdas/SPARK-7958 and squashes the following commits: 20b2ec1 [Tathagata Das] Added synchronized 790b617 [Tathagata Das] Handled exception in StreamingContext.start()
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala16
3 files changed, 33 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 25842d5025..624a31ddc2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.collection.Map
import scala.collection.mutable.Queue
import scala.reflect.ClassTag
+import scala.util.control.NonFatal
import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
@@ -576,18 +577,26 @@ class StreamingContext private[streaming] (
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
- validate()
startSite.set(DStream.getCreationSite())
sparkContext.setCallSite(startSite.get)
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
- scheduler.start()
- uiTab.foreach(_.attach())
- state = StreamingContextState.ACTIVE
+ try {
+ validate()
+ scheduler.start()
+ state = StreamingContextState.ACTIVE
+ } catch {
+ case NonFatal(e) =>
+ logError("Error starting the context, marking it as stopped", e)
+ scheduler.stop(false)
+ state = StreamingContextState.STOPPED
+ throw e
+ }
StreamingContext.setActiveContext(this)
}
shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
+ uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 1d1ddaaccf..4af9b6d3b5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -126,6 +126,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
eventLoop.post(ErrorReported(msg, e))
}
+ def isStarted(): Boolean = synchronized {
+ eventLoop != null
+ }
+
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index d304c9a732..819dd2ccfe 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -151,6 +151,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(StreamingContext.getActive().isEmpty)
}
+ test("start failure should stop internal components") {
+ ssc = new StreamingContext(conf, batchDuration)
+ val inputStream = addInputStream(ssc)
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some(values.sum + state.getOrElse(0))
+ }
+ inputStream.map(x => (x, 1)).updateStateByKey[Int](updateFunc)
+ // Require that the start fails because checkpoint directory was not set
+ intercept[Exception] {
+ ssc.start()
+ }
+ assert(ssc.getState() === StreamingContextState.STOPPED)
+ assert(ssc.scheduler.isStarted === false)
+ }
+
+
test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()