diff options
author | Guillaume Poulin <guillaume@hopper.com> | 2015-10-03 12:14:00 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-10-03 12:14:00 +0100 |
commit | be0dcd6eb120491bca62d65a11c476401f9932c1 (patch) | |
tree | 69544fdffc911b05940348705557ccd6b908c699 /streaming | |
parent | 107320c9bbfe2496963a4e75e60fd6ba7fbfbabc (diff) | |
download | spark-be0dcd6eb120491bca62d65a11c476401f9932c1.tar.gz spark-be0dcd6eb120491bca62d65a11c476401f9932c1.tar.bz2 spark-be0dcd6eb120491bca62d65a11c476401f9932c1.zip |
FIX: rememberDuration reassignment error message
I was reading throught the scheduler and found this small mistake.
Author: Guillaume Poulin <guillaume@hopper.com>
Closes #8966 from gpoulin/remember_duration_typo.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala | 16 |
1 files changed, 5 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 40789c66f3..ebbcb6b778 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -38,9 +38,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def start(time: Time) { this.synchronized { - if (zeroTime != null) { - throw new Exception("DStream graph computation already started") - } + require(zeroTime == null, "DStream graph computation already started") zeroTime = time startTime = time outputStreams.foreach(_.initialize(zeroTime)) @@ -68,20 +66,16 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def setBatchDuration(duration: Duration) { this.synchronized { - if (batchDuration != null) { - throw new Exception("Batch duration already set as " + batchDuration + - ". cannot set it again.") - } + require(batchDuration == null, + s"Batch duration already set as $batchDuration. Cannot set it again.") batchDuration = duration } } def remember(duration: Duration) { this.synchronized { - if (rememberDuration != null) { - throw new Exception("Remember duration already set as " + batchDuration + - ". cannot set it again.") - } + require(rememberDuration == null, + s"Remember duration already set as $rememberDuration. Cannot set it again.") rememberDuration = duration } } |