From 1ee8eb431e04db16f95f0bcb3a546ad6e14b616f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 21 May 2015 00:30:55 -0700 Subject: [SPARK-7745] Change asserts to requires for user input checks in Spark Streaming Assertions can be turned off. `require` throws an `IllegalArgumentException` which makes more sense when it's a user set variable. Author: Burak Yavuz Closes #6271 from brkyvz/streaming-require and squashes the following commits: d249484 [Burak Yavuz] fix merge conflict 264adb8 [Burak Yavuz] addressed comments v1.0 6161350 [Burak Yavuz] fix tests 16aa766 [Burak Yavuz] changed more assertions to more meaningful errors afd923d [Burak Yavuz] changed some assertions to require --- .../org/apache/spark/streaming/DStreamGraph.scala | 4 +- .../apache/spark/streaming/StreamingContext.scala | 11 +++--- .../spark/streaming/api/python/PythonDStream.scala | 4 +- .../apache/spark/streaming/dstream/DStream.scala | 45 +++++++++++----------- .../streaming/dstream/ReducedWindowedDStream.scala | 4 +- .../streaming/scheduler/ReceivedBlockTracker.scala | 2 +- .../spark/streaming/StreamingContextSuite.scala | 6 +-- 7 files changed, 38 insertions(+), 38 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 85b354ff4a..40789c66f3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -157,10 +157,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def validate() { this.synchronized { - assert(batchDuration != null, "Batch duration has not been set") + require(batchDuration != null, "Batch duration has not been set") // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + // " is very low") - assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute") + require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 95063692e1..160fc42c57 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -156,7 +156,7 @@ class StreamingContext private[streaming] ( cp_.graph.restoreCheckpointData() cp_.graph } else { - assert(batchDur_ != null, "Batch duration for streaming context cannot be null") + require(batchDur_ != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(batchDur_) newGraph @@ -462,7 +462,8 @@ class StreamingContext private[streaming] ( directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf) val data = br.map { case (k, v) => val bytes = v.getBytes - assert(bytes.length == recordLength, "Byte array does not have correct length") + require(bytes.length == recordLength, "Byte array does not have correct length. " + + s"${bytes.length} did not equal recordLength: $recordLength") bytes } data @@ -568,7 +569,7 @@ class StreamingContext private[streaming] ( /** * Start the execution of the streams. * - * @throws SparkException if the StreamingContext is already stopped. + * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { state match { @@ -587,7 +588,7 @@ class StreamingContext private[streaming] ( case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => - throw new SparkException("StreamingContext has already been stopped") + throw new IllegalStateException("StreamingContext has already been stopped") } } @@ -689,7 +690,7 @@ object StreamingContext extends Logging { private def assertNoOtherContextIsActive(): Unit = { ACTIVATION_LOCK.synchronized { if (activeContext.get() != null) { - throw new SparkException( + throw new IllegalStateException( "Only one StreamingContext may be started in this JVM. " + "Currently running StreamingContext was started at" + activeContext.get.startSite.get.longForm) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 4c28654ef6..d06401245f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -109,7 +109,7 @@ private[python] object PythonTransformFunctionSerializer { } def serialize(func: PythonTransformFunction): Array[Byte] = { - assert(serializer != null, "Serializer has not been registered!") + require(serializer != null, "Serializer has not been registered!") // get the id of PythonTransformFunction in py4j val h = Proxy.getInvocationHandler(func.asInstanceOf[Proxy]) val f = h.getClass().getDeclaredField("id") @@ -119,7 +119,7 @@ private[python] object PythonTransformFunctionSerializer { } def deserialize(bytes: Array[Byte]): PythonTransformFunction = { - assert(serializer != null, "Serializer has not been registered!") + require(serializer != null, "Serializer has not been registered!") serializer.loads(bytes) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 7c50a766a9..c858647c64 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -217,53 +217,52 @@ abstract class DStream[T: ClassTag] ( case StreamingContextState.INITIALIZED => // good to go case StreamingContextState.ACTIVE => - throw new SparkException( + throw new IllegalStateException( "Adding new inputs, transformations, and output operations after " + "starting a context is not supported") case StreamingContextState.STOPPED => - throw new SparkException( + throw new IllegalStateException( "Adding new inputs, transformations, and output operations after " + "stopping a context is not supported") } } private[streaming] def validateAtStart() { - assert(rememberDuration != null, "Remember duration is set to null") + require(rememberDuration != null, "Remember duration is set to null") - assert( + require( !mustCheckpoint || checkpointDuration != null, "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." + " Please use DStream.checkpoint() to set the interval." ) - assert( + require( checkpointDuration == null || context.sparkContext.checkpointDir.isDefined, - "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" + - " or SparkContext.checkpoint() to set the checkpoint directory." + "The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()." ) - assert( + require( checkpointDuration == null || checkpointDuration >= slideDuration, "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " + "Please set it to at least " + slideDuration + "." ) - assert( + require( checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration), "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " + - "Please set it to a multiple " + slideDuration + "." + "Please set it to a multiple of " + slideDuration + "." ) - assert( + require( checkpointDuration == null || storageLevel != StorageLevel.NONE, "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " + "level has not been set to enable persisting. Please use DStream.persist() to set the " + "storage level to use memory for better checkpointing performance." ) - assert( + require( checkpointDuration == null || rememberDuration > checkpointDuration, "The remember duration for " + this.getClass.getSimpleName + " has been set to " + rememberDuration + " which is not more than the checkpoint interval (" + @@ -272,7 +271,7 @@ abstract class DStream[T: ClassTag] ( val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf) logInfo("metadataCleanupDelay = " + metadataCleanerDelay) - assert( + require( metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, "It seems you are doing some DStream window operation or setting a checkpoint interval " + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + @@ -633,8 +632,8 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope { - // because the DStream is reachable from the outer object here, and because - // DStreams can't be serialized with closures, we can't proactively check + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } @@ -644,8 +643,8 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope { - // because the DStream is reachable from the outer object here, and because - // DStreams can't be serialized with closures, we can't proactively check + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = context.sparkContext.clean(transformFunc, false) transform((r: RDD[T], t: Time) => cleanedF(r)) @@ -656,8 +655,8 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope { - // because the DStream is reachable from the outer object here, and because - // DStreams can't be serialized with closures, we can't proactively check + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { @@ -674,8 +673,8 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] ): DStream[V] = ssc.withScope { - // because the DStream is reachable from the outer object here, and because - // DStreams can't be serialized with closures, we can't proactively check + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = ssc.sparkContext.clean(transformFunc, false) transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) @@ -688,8 +687,8 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] ): DStream[V] = ssc.withScope { - // because the DStream is reachable from the outer object here, and because - // DStreams can't be serialized with closures, we can't proactively check + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = ssc.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index 1385ccbf56..df9f7f140e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -40,12 +40,12 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( partitioner: Partitioner ) extends DStream[(K,V)](parent.ssc) { - assert(_windowDuration.isMultipleOf(parent.slideDuration), + require(_windowDuration.isMultipleOf(parent.slideDuration), "The window duration of ReducedWindowedDStream (" + _windowDuration + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) - assert(_slideDuration.isMultipleOf(parent.slideDuration), + require(_slideDuration.isMultipleOf(parent.slideDuration), "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index a9f4147a5f..7720259a5d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -153,7 +153,7 @@ private[streaming] class ReceivedBlockTracker( * returns only after the files are cleaned up. */ def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized { - assert(cleanupThreshTime.milliseconds < clock.getTimeMillis()) + require(cleanupThreshTime.milliseconds < clock.getTimeMillis()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq logInfo("Deleting batches " + timesToCleanup) writeToLog(BatchCleanupEvent(timesToCleanup)) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 3a958bf3a3..f8e8030791 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -182,7 +182,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register() ssc.stop() - intercept[SparkException] { + intercept[IllegalStateException] { ssc.start() // start after stop should throw exception } assert(ssc.getState() === StreamingContextState.STOPPED) @@ -600,7 +600,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val anotherInput = addInputStream(anotherSsc) anotherInput.foreachRDD { rdd => rdd.count } - val exception = intercept[SparkException] { + val exception = intercept[IllegalStateException] { anotherSsc.start() } assert(exception.getMessage.contains("StreamingContext"), "Did not get the right exception") @@ -623,7 +623,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w def testForException(clue: String, expectedErrorMsg: String)(body: => Unit): Unit = { withClue(clue) { - val ex = intercept[SparkException] { + val ex = intercept[IllegalStateException] { body } assert(ex.getMessage.toLowerCase().contains(expectedErrorMsg)) -- cgit v1.2.3