aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-12 08:48:24 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-12 08:48:24 -0700
commitec6f2a9774167014566fb9608ee4394d2ce5fd6a (patch)
tree61b2427df3f18edb03a1312f6fa9457f8f83b890 /streaming
parentf3e8e60063ccf0d713d03e671a3231560475f90d (diff)
downloadspark-ec6f2a9774167014566fb9608ee4394d2ce5fd6a.tar.gz
spark-ec6f2a9774167014566fb9608ee4394d2ce5fd6a.tar.bz2
spark-ec6f2a9774167014566fb9608ee4394d2ce5fd6a.zip
[SPARK-7532] [STREAMING] StreamingContext.start() made to logWarning and not throw exception
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6060 from tdas/SPARK-7532 and squashes the following commits: 6fe2e83 [Tathagata Das] Update docs 7dadfc3 [Tathagata Das] Fixed bug again 99c7678 [Tathagata Das] Added logInfo 65aec20 [Tathagata Das] Fix bug 5bf031b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7532 1a9a818 [Tathagata Das] Fix scaladoc c584313 [Tathagata Das] StreamingContext.start() made to logWarning and not throw exception
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala27
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala4
2 files changed, 14 insertions, 17 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 2c5834defa..8461e90120 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -528,28 +528,27 @@ class StreamingContext private[streaming] (
/**
* Start the execution of the streams.
*
- * @throws SparkException if the context has already been started or stopped.
+ * @throws SparkException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
- import StreamingContext._
state match {
case INITIALIZED =>
- // good to start
+ validate()
+ startSite.set(DStream.getCreationSite())
+ sparkContext.setCallSite(startSite.get)
+ StreamingContext.ACTIVATION_LOCK.synchronized {
+ StreamingContext.assertNoOtherContextIsActive()
+ scheduler.start()
+ uiTab.foreach(_.attach())
+ state = StreamingContextState.ACTIVE
+ StreamingContext.setActiveContext(this)
+ }
+ logInfo("StreamingContext started")
case ACTIVE =>
- throw new SparkException("StreamingContext has already been started")
+ logWarning("StreamingContext has already been started")
case STOPPED =>
throw new SparkException("StreamingContext has already been stopped")
}
- validate()
- startSite.set(DStream.getCreationSite())
- sparkContext.setCallSite(startSite.get)
- ACTIVATION_LOCK.synchronized {
- assertNoOtherContextIsActive()
- scheduler.start()
- uiTab.foreach(_.attach())
- state = StreamingContextState.ACTIVE
- setActiveContext(this)
- }
}
/**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index b8247db7e8..47299513de 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -136,9 +136,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
addInputStream(ssc).register()
ssc.start()
assert(ssc.getState() === StreamingContextState.ACTIVE)
- intercept[SparkException] {
- ssc.start()
- }
+ ssc.start()
assert(ssc.getState() === StreamingContextState.ACTIVE)
}