aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-15 12:26:44 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-15 12:26:44 -0700
commitb760d6426a7fa2a6d115cefc786aa766b9419bd6 (patch)
tree8bae9908dadead6f77f0995820a6bf4a9ff1fb21 /streaming
parent3f1aae5c71a220564adc9039dbc0e4b22aea315d (diff)
downloadspark-b760d6426a7fa2a6d115cefc786aa766b9419bd6.tar.gz
spark-b760d6426a7fa2a6d115cefc786aa766b9419bd6.tar.bz2
spark-b760d6426a7fa2a6d115cefc786aa766b9419bd6.zip
Minor modifications.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala19
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala2
3 files changed, 14 insertions, 13 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index b38911b646..f7936bdc5f 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -25,6 +25,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) ext
assert(framework != null, "Checkpoint.framework is null")
assert(graph != null, "Checkpoint.graph is null")
assert(batchDuration != null, "Checkpoint.batchDuration is null")
+ assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
}
def saveToFile(file: String = checkpointFile) {
@@ -60,6 +61,11 @@ object Checkpoint {
throw new Exception("Checkpoint file '" + file + "' does not exist")
}
val fis = fs.open(path)
+ // ObjectInputStream uses the last defined user-defined class loader in the stack
+ // to find classes, which maybe the wrong class loader. Hence, a inherited version
+ // of ObjectInputStream is used to explicitly use the current thread's default class
+ // loader to find and load classes. This is a well know Java issue and has popped up
+ // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
val ois = new ObjectInputStreamWithLoader(fis, Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index e072f15c93..62d21b83d9 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -184,25 +184,20 @@ class StreamingContext (
}
/**
- * This function verify whether the stream computation is eligible to be executed.
+ * This function validate whether the stream computation is eligible to be executed.
*/
- private def verify() {
- if (batchDuration == null) {
- throw new Exception("Batch duration has not been set")
- }
- if (batchDuration < Milliseconds(100)) {
- logWarning("Batch duration of " + batchDuration + " is very low")
- }
- if (graph.getOutputStreams().size == 0) {
- throw new Exception("No output streams registered, so nothing to execute")
- }
+ private def validate() {
+ assert(batchDuration != null, "Batch duration has not been set")
+ assert(batchDuration > Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
+ assert(graph != null, "Graph is null")
+ assert(graph.getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
}
/**
* This function starts the execution of the streams.
*/
def start() {
- verify()
+ validate()
val networkInputStreams = graph.getInputStreams().filter(s => s match {
case n: NetworkInputDStream[_] => true
case _ => false
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
index cb95c36782..91ffc0c098 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
@@ -61,9 +61,9 @@ trait DStreamSuiteBase extends FunSuite with Logging {
// Setup the stream computation
val inputStream = new TestInputStream(ssc, input)
- ssc.registerInputStream(inputStream)
val operatedStream = operation(inputStream)
val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with SynchronizedBuffer[Seq[V]])
+ ssc.registerInputStream(inputStream)
ssc.registerOutputStream(outputStream)
ssc
}