diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-15 12:26:44 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-15 12:26:44 -0700 |
commit | b760d6426a7fa2a6d115cefc786aa766b9419bd6 (patch) | |
tree | 8bae9908dadead6f77f0995820a6bf4a9ff1fb21 /streaming/src/main | |
parent | 3f1aae5c71a220564adc9039dbc0e4b22aea315d (diff) | |
download | spark-b760d6426a7fa2a6d115cefc786aa766b9419bd6.tar.gz spark-b760d6426a7fa2a6d115cefc786aa766b9419bd6.tar.bz2 spark-b760d6426a7fa2a6d115cefc786aa766b9419bd6.zip |
Minor modifications.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/Checkpoint.scala | 6 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 19 |
2 files changed, 13 insertions, 12 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index b38911b646..f7936bdc5f 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -25,6 +25,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) ext assert(framework != null, "Checkpoint.framework is null") assert(graph != null, "Checkpoint.graph is null") assert(batchDuration != null, "Checkpoint.batchDuration is null") + assert(checkpointTime != null, "Checkpoint.checkpointTime is null") } def saveToFile(file: String = checkpointFile) { @@ -60,6 +61,11 @@ object Checkpoint { throw new Exception("Checkpoint file '" + file + "' does not exist") } val fis = fs.open(path) + // ObjectInputStream uses the last defined user-defined class loader in the stack + // to find classes, which maybe the wrong class loader. Hence, a inherited version + // of ObjectInputStream is used to explicitly use the current thread's default class + // loader to find and load classes. This is a well know Java issue and has popped up + // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) val ois = new ObjectInputStreamWithLoader(fis, Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] ois.close() diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index e072f15c93..62d21b83d9 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -184,25 +184,20 @@ class StreamingContext ( } /** - * This function verify whether the stream computation is eligible to be executed. + * This function validate whether the stream computation is eligible to be executed. */ - private def verify() { - if (batchDuration == null) { - throw new Exception("Batch duration has not been set") - } - if (batchDuration < Milliseconds(100)) { - logWarning("Batch duration of " + batchDuration + " is very low") - } - if (graph.getOutputStreams().size == 0) { - throw new Exception("No output streams registered, so nothing to execute") - } + private def validate() { + assert(batchDuration != null, "Batch duration has not been set") + assert(batchDuration > Milliseconds(100), "Batch duration of " + batchDuration + " is very low") + assert(graph != null, "Graph is null") + assert(graph.getOutputStreams().size > 0, "No output streams registered, so nothing to execute") } /** * This function starts the execution of the streams. */ def start() { - verify() + validate() val networkInputStreams = graph.getInputStreams().filter(s => s match { case n: NetworkInputDStream[_] => true case _ => false |