aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-12 17:07:21 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-12 17:07:21 -0700
commit23f7d66d51c8809ebc27bfbce3d95515e9b34c2e (patch)
tree98e25455ba627a20e1fb2ac273d3487655131c02 /streaming
parent2713bc65af1e0e81edd5fad0338e34fd127391f9 (diff)
downloadspark-23f7d66d51c8809ebc27bfbce3d95515e9b34c2e.tar.gz
spark-23f7d66d51c8809ebc27bfbce3d95515e9b34c2e.tar.bz2
spark-23f7d66d51c8809ebc27bfbce3d95515e9b34c2e.zip
[SPARK-7554] [STREAMING] Throw exception when an active/stopped StreamingContext is used to create DStreams and output operations
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6099 from tdas/SPARK-7554 and squashes the following commits: 2cd4158 [Tathagata Das] Throw exceptions on attempts to add stuff to active and stopped contexts.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala21
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala39
3 files changed, 59 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 9c7f698840..85b354ff4a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -45,7 +45,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
- outputStreams.foreach(_.validate)
+ outputStreams.foreach(_.validateAtStart)
inputStreams.par.foreach(_.start())
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 7092a3d3f0..64de7526a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -60,6 +60,8 @@ abstract class DStream[T: ClassTag] (
@transient private[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
+ validateAtInit()
+
// =======================================================================
// Methods that should be implemented by subclasses of DStream
// =======================================================================
@@ -171,7 +173,22 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.initialize(zeroTime))
}
- private[streaming] def validate() {
+ private def validateAtInit(): Unit = {
+ ssc.getState() match {
+ case StreamingContextState.INITIALIZED =>
+ // good to go
+ case StreamingContextState.ACTIVE =>
+ throw new SparkException(
+ "Adding new inputs, transformations, and output operations after " +
+ "starting a context is not supported")
+ case StreamingContextState.STOPPED =>
+ throw new SparkException(
+ "Adding new inputs, transformations, and output operations after " +
+ "stopping a context is not supported")
+ }
+ }
+
+ private[streaming] def validateAtStart() {
assert(rememberDuration != null, "Remember duration is set to null")
assert(
@@ -226,7 +243,7 @@ abstract class DStream[T: ClassTag] (
math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
)
- dependencies.foreach(_.validate())
+ dependencies.foreach(_.validateAtStart())
logInfo("Slide time = " + slideDuration)
logInfo("Storage level = " + storageLevel)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5d09b234f7..5f93332896 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -651,6 +651,45 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
testPackage.test()
}
+ test("throw exception on using active or stopped context") {
+ val conf = new SparkConf()
+ .setMaster(master)
+ .setAppName(appName)
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
+ ssc = new StreamingContext(conf, batchDuration)
+ require(ssc.getState() === StreamingContextState.INITIALIZED)
+ val input = addInputStream(ssc)
+ val transformed = input.map { x => x}
+ transformed.foreachRDD { rdd => rdd.count }
+
+ def testForException(clue: String, expectedErrorMsg: String)(body: => Unit): Unit = {
+ withClue(clue) {
+ val ex = intercept[SparkException] {
+ body
+ }
+ assert(ex.getMessage.toLowerCase().contains(expectedErrorMsg))
+ }
+ }
+
+ ssc.start()
+ require(ssc.getState() === StreamingContextState.ACTIVE)
+ testForException("no error on adding input after start", "start") {
+ addInputStream(ssc) }
+ testForException("no error on adding transformation after start", "start") {
+ input.map { x => x * 2 } }
+ testForException("no error on adding output operation after start", "start") {
+ transformed.foreachRDD { rdd => rdd.collect() } }
+
+ ssc.stop()
+ require(ssc.getState() === StreamingContextState.STOPPED)
+ testForException("no error on adding input after stop", "stop") {
+ addInputStream(ssc) }
+ testForException("no error on adding transformation after stop", "stop") {
+ input.map { x => x * 2 } }
+ testForException("no error on adding output operation after stop", "stop") {
+ transformed.foreachRDD { rdd => rdd.collect() } }
+ }
+
def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1)