aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorGuillaume Poulin <guillaume@hopper.com>2015-10-03 12:14:00 +0100
committerSean Owen <sowen@cloudera.com>2015-10-03 12:14:00 +0100
commitbe0dcd6eb120491bca62d65a11c476401f9932c1 (patch)
tree69544fdffc911b05940348705557ccd6b908c699 /streaming
parent107320c9bbfe2496963a4e75e60fd6ba7fbfbabc (diff)
downloadspark-be0dcd6eb120491bca62d65a11c476401f9932c1.tar.gz
spark-be0dcd6eb120491bca62d65a11c476401f9932c1.tar.bz2
spark-be0dcd6eb120491bca62d65a11c476401f9932c1.zip
FIX: rememberDuration reassignment error message
I was reading throught the scheduler and found this small mistake. Author: Guillaume Poulin <guillaume@hopper.com> Closes #8966 from gpoulin/remember_duration_typo.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala16
1 files changed, 5 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 40789c66f3..ebbcb6b778 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -38,9 +38,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def start(time: Time) {
this.synchronized {
- if (zeroTime != null) {
- throw new Exception("DStream graph computation already started")
- }
+ require(zeroTime == null, "DStream graph computation already started")
zeroTime = time
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
@@ -68,20 +66,16 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def setBatchDuration(duration: Duration) {
this.synchronized {
- if (batchDuration != null) {
- throw new Exception("Batch duration already set as " + batchDuration +
- ". cannot set it again.")
- }
+ require(batchDuration == null,
+ s"Batch duration already set as $batchDuration. Cannot set it again.")
batchDuration = duration
}
}
def remember(duration: Duration) {
this.synchronized {
- if (rememberDuration != null) {
- throw new Exception("Remember duration already set as " + batchDuration +
- ". cannot set it again.")
- }
+ require(rememberDuration == null,
+ s"Remember duration already set as $rememberDuration. Cannot set it again.")
rememberDuration = duration
}
}