aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-12 17:07:21 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-12 17:07:21 -0700
commit23f7d66d51c8809ebc27bfbce3d95515e9b34c2e (patch)
tree98e25455ba627a20e1fb2ac273d3487655131c02 /streaming/src/main
parent2713bc65af1e0e81edd5fad0338e34fd127391f9 (diff)
downloadspark-23f7d66d51c8809ebc27bfbce3d95515e9b34c2e.tar.gz
spark-23f7d66d51c8809ebc27bfbce3d95515e9b34c2e.tar.bz2
spark-23f7d66d51c8809ebc27bfbce3d95515e9b34c2e.zip
[SPARK-7554] [STREAMING] Throw exception when an active/stopped StreamingContext is used to create DStreams and output operations
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6099 from tdas/SPARK-7554 and squashes the following commits: 2cd4158 [Tathagata Das] Throw exceptions on attempts to add stuff to active and stopped contexts.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala21
2 files changed, 20 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 9c7f698840..85b354ff4a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -45,7 +45,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
- outputStreams.foreach(_.validate)
+ outputStreams.foreach(_.validateAtStart)
inputStreams.par.foreach(_.start())
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 7092a3d3f0..64de7526a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -60,6 +60,8 @@ abstract class DStream[T: ClassTag] (
@transient private[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
+ validateAtInit()
+
// =======================================================================
// Methods that should be implemented by subclasses of DStream
// =======================================================================
@@ -171,7 +173,22 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.initialize(zeroTime))
}
- private[streaming] def validate() {
+ private def validateAtInit(): Unit = {
+ ssc.getState() match {
+ case StreamingContextState.INITIALIZED =>
+ // good to go
+ case StreamingContextState.ACTIVE =>
+ throw new SparkException(
+ "Adding new inputs, transformations, and output operations after " +
+ "starting a context is not supported")
+ case StreamingContextState.STOPPED =>
+ throw new SparkException(
+ "Adding new inputs, transformations, and output operations after " +
+ "stopping a context is not supported")
+ }
+ }
+
+ private[streaming] def validateAtStart() {
assert(rememberDuration != null, "Remember duration is set to null")
assert(
@@ -226,7 +243,7 @@ abstract class DStream[T: ClassTag] (
math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
)
- dependencies.foreach(_.validate())
+ dependencies.foreach(_.validateAtStart())
logInfo("Slide time = " + slideDuration)
logInfo("Storage level = " + storageLevel)