aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala27
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala4
2 files changed, 14 insertions, 17 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 2c5834defa..8461e90120 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -528,28 +528,27 @@ class StreamingContext private[streaming] (
/**
* Start the execution of the streams.
*
- * @throws SparkException if the context has already been started or stopped.
+ * @throws SparkException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
- import StreamingContext._
state match {
case INITIALIZED =>
- // good to start
+ validate()
+ startSite.set(DStream.getCreationSite())
+ sparkContext.setCallSite(startSite.get)
+ StreamingContext.ACTIVATION_LOCK.synchronized {
+ StreamingContext.assertNoOtherContextIsActive()
+ scheduler.start()
+ uiTab.foreach(_.attach())
+ state = StreamingContextState.ACTIVE
+ StreamingContext.setActiveContext(this)
+ }
+ logInfo("StreamingContext started")
case ACTIVE =>
- throw new SparkException("StreamingContext has already been started")
+ logWarning("StreamingContext has already been started")
case STOPPED =>
throw new SparkException("StreamingContext has already been stopped")
}
- validate()
- startSite.set(DStream.getCreationSite())
- sparkContext.setCallSite(startSite.get)
- ACTIVATION_LOCK.synchronized {
- assertNoOtherContextIsActive()
- scheduler.start()
- uiTab.foreach(_.attach())
- state = StreamingContextState.ACTIVE
- setActiveContext(this)
- }
}
/**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index b8247db7e8..47299513de 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -136,9 +136,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
addInputStream(ssc).register()
ssc.start()
assert(ssc.getState() === StreamingContextState.ACTIVE)
- intercept[SparkException] {
- ssc.start()
- }
+ ssc.start()
assert(ssc.getState() === StreamingContextState.ACTIVE)
}