diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala | 16 |
1 files changed, 5 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 40789c66f3..ebbcb6b778 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -38,9 +38,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def start(time: Time) { this.synchronized { - if (zeroTime != null) { - throw new Exception("DStream graph computation already started") - } + require(zeroTime == null, "DStream graph computation already started") zeroTime = time startTime = time outputStreams.foreach(_.initialize(zeroTime)) @@ -68,20 +66,16 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def setBatchDuration(duration: Duration) { this.synchronized { - if (batchDuration != null) { - throw new Exception("Batch duration already set as " + batchDuration + - ". cannot set it again.") - } + require(batchDuration == null, + s"Batch duration already set as $batchDuration. Cannot set it again.") batchDuration = duration } } def remember(duration: Duration) { this.synchronized { - if (rememberDuration != null) { - throw new Exception("Remember duration already set as " + batchDuration + - ". cannot set it again.") - } + require(rememberDuration == null, + s"Remember duration already set as $rememberDuration. Cannot set it again.") rememberDuration = duration } } |