From 058797c1722c9251f6bc6ad2672cb0e79146b04f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 25 Apr 2014 19:04:34 -0700 Subject: [Spark-1382] Fix NPE in DStream.slice (updated version of #365) @zsxwing I cherry-picked your changes and merged the master. #365 had some conflicts once again! Author: zsxwing Author: Tathagata Das Closes #562 from tdas/SPARK-1382 and squashes the following commits: e2962c1 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-1382 20968d9 [zsxwing] Replace Exception with SparkException in DStream e476651 [zsxwing] Merge remote-tracking branch 'origin/master' into SPARK-1382 35ba56a [zsxwing] SPARK-1382: Fix NPE in DStream.slice --- .../apache/spark/streaming/dstream/DStream.scala | 22 ++++++++++++---------- .../spark/streaming/BasicOperationsSuite.scala | 12 +++++++++++- 2 files changed, 23 insertions(+), 11 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index f69f69e0c4..4709a62381 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -18,20 +18,19 @@ package org.apache.spark.streaming.dstream +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} + import scala.deprecated import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import java.io.{IOException, ObjectInputStream, ObjectOutputStream} - -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkException} import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.streaming.Duration +import org.apache.spark.util.MetadataCleaner /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -144,7 +143,7 @@ abstract class DStream[T: ClassTag] ( */ private[streaming] def initialize(time: Time) { if (zeroTime != null && zeroTime != time) { - throw new Exception("ZeroTime is already initialized to " + zeroTime + throw new SparkException("ZeroTime is already initialized to " + zeroTime + ", cannot initialize it again to " + time) } zeroTime = time @@ -220,7 +219,7 @@ abstract class DStream[T: ClassTag] ( "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + - "set the Java property 'spark.cleaner.delay' to more than " + + "set the Java cleaner delay to more than " + math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." ) @@ -235,7 +234,7 @@ abstract class DStream[T: ClassTag] ( private[streaming] def setContext(s: StreamingContext) { if (ssc != null && ssc != s) { - throw new Exception("Context is already set in " + this + ", cannot set it again") + throw new SparkException("Context is already set in " + this + ", cannot set it again") } ssc = s logInfo("Set context for " + this) @@ -244,7 +243,7 @@ abstract class DStream[T: ClassTag] ( private[streaming] def setGraph(g: DStreamGraph) { if (graph != null && graph != g) { - throw new Exception("Graph is already set in " + this + ", cannot set it again") + throw new SparkException("Graph is already set in " + this + ", cannot set it again") } graph = g dependencies.foreach(_.setGraph(graph)) @@ -261,7 +260,7 @@ abstract class DStream[T: ClassTag] ( /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ private[streaming] def isTimeValid(time: Time): Boolean = { if (!isInitialized) { - throw new Exception (this + " has not been initialized") + throw new SparkException (this + " has not been initialized") } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) @@ -728,6 +727,9 @@ abstract class DStream[T: ClassTag] ( * Return all the RDDs between 'fromTime' to 'toTime' (both included) */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { + if (!isInitialized) { + throw new SparkException(this + " has not been initialized") + } if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 4792ca1f8a..04925886c3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.SparkContext._ import util.ManualClock -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkException, SparkConf} import org.apache.spark.streaming.dstream.{WindowedDStream, DStream} import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.reflect.ClassTag @@ -398,6 +398,16 @@ class BasicOperationsSuite extends TestSuiteBase { Thread.sleep(1000) } + test("slice - has not been initialized") { + val ssc = new StreamingContext(conf, Seconds(1)) + val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) + val stream = new TestInputStream[Int](ssc, input, 2) + val thrown = intercept[SparkException] { + stream.slice(new Time(0), new Time(1000)) + } + assert(thrown.getMessage.contains("has not been initialized")) + } + val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq test("rdd cleanup - map and window") { -- cgit v1.2.3