diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-12 08:48:24 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-12 08:48:24 -0700 |
commit | ec6f2a9774167014566fb9608ee4394d2ce5fd6a (patch) | |
tree | 61b2427df3f18edb03a1312f6fa9457f8f83b890 /streaming | |
parent | f3e8e60063ccf0d713d03e671a3231560475f90d (diff) | |
download | spark-ec6f2a9774167014566fb9608ee4394d2ce5fd6a.tar.gz spark-ec6f2a9774167014566fb9608ee4394d2ce5fd6a.tar.bz2 spark-ec6f2a9774167014566fb9608ee4394d2ce5fd6a.zip |
[SPARK-7532] [STREAMING] StreamingContext.start() made to logWarning and not throw exception
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #6060 from tdas/SPARK-7532 and squashes the following commits:
6fe2e83 [Tathagata Das] Update docs
7dadfc3 [Tathagata Das] Fixed bug again
99c7678 [Tathagata Das] Added logInfo
65aec20 [Tathagata Das] Fix bug
5bf031b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7532
1a9a818 [Tathagata Das] Fix scaladoc
c584313 [Tathagata Das] StreamingContext.start() made to logWarning and not throw exception
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 27 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 4 |
2 files changed, 14 insertions, 17 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 2c5834defa..8461e90120 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -528,28 +528,27 @@ class StreamingContext private[streaming] ( /** * Start the execution of the streams. * - * @throws SparkException if the context has already been started or stopped. + * @throws SparkException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { - import StreamingContext._ state match { case INITIALIZED => - // good to start + validate() + startSite.set(DStream.getCreationSite()) + sparkContext.setCallSite(startSite.get) + StreamingContext.ACTIVATION_LOCK.synchronized { + StreamingContext.assertNoOtherContextIsActive() + scheduler.start() + uiTab.foreach(_.attach()) + state = StreamingContextState.ACTIVE + StreamingContext.setActiveContext(this) + } + logInfo("StreamingContext started") case ACTIVE => - throw new SparkException("StreamingContext has already been started") + logWarning("StreamingContext has already been started") case STOPPED => throw new SparkException("StreamingContext has already been stopped") } - validate() - startSite.set(DStream.getCreationSite()) - sparkContext.setCallSite(startSite.get) - ACTIVATION_LOCK.synchronized { - assertNoOtherContextIsActive() - scheduler.start() - uiTab.foreach(_.attach()) - state = StreamingContextState.ACTIVE - setActiveContext(this) - } } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index b8247db7e8..47299513de 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -136,9 +136,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w addInputStream(ssc).register() ssc.start() assert(ssc.getState() === StreamingContextState.ACTIVE) - intercept[SparkException] { - ssc.start() - } + ssc.start() assert(ssc.getState() === StreamingContextState.ACTIVE) } |