aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-05-21 00:30:55 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-21 00:30:55 -0700
commit1ee8eb431e04db16f95f0bcb3a546ad6e14b616f (patch)
treec5341290d8254d7240dfe8ed78c245c165922a96 /streaming
parent947ea1cf5f6986aa687631d6cf9f0fb974ee7caf (diff)
downloadspark-1ee8eb431e04db16f95f0bcb3a546ad6e14b616f.tar.gz
spark-1ee8eb431e04db16f95f0bcb3a546ad6e14b616f.tar.bz2
spark-1ee8eb431e04db16f95f0bcb3a546ad6e14b616f.zip
[SPARK-7745] Change asserts to requires for user input checks in Spark Streaming
Assertions can be turned off. `require` throws an `IllegalArgumentException` which makes more sense when it's a user set variable. Author: Burak Yavuz <brkyvz@gmail.com> Closes #6271 from brkyvz/streaming-require and squashes the following commits: d249484 [Burak Yavuz] fix merge conflict 264adb8 [Burak Yavuz] addressed comments v1.0 6161350 [Burak Yavuz] fix tests 16aa766 [Burak Yavuz] changed more assertions to more meaningful errors afd923d [Burak Yavuz] changed some assertions to require
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala45
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala6
7 files changed, 38 insertions, 38 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 85b354ff4a..40789c66f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -157,10 +157,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def validate() {
this.synchronized {
- assert(batchDuration != null, "Batch duration has not been set")
+ require(batchDuration != null, "Batch duration has not been set")
// assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
// " is very low")
- assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
+ require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 95063692e1..160fc42c57 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -156,7 +156,7 @@ class StreamingContext private[streaming] (
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
- assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
+ require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
@@ -462,7 +462,8 @@ class StreamingContext private[streaming] (
directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
- assert(bytes.length == recordLength, "Byte array does not have correct length")
+ require(bytes.length == recordLength, "Byte array does not have correct length. " +
+ s"${bytes.length} did not equal recordLength: $recordLength")
bytes
}
data
@@ -568,7 +569,7 @@ class StreamingContext private[streaming] (
/**
* Start the execution of the streams.
*
- * @throws SparkException if the StreamingContext is already stopped.
+ * @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
@@ -587,7 +588,7 @@ class StreamingContext private[streaming] (
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
- throw new SparkException("StreamingContext has already been stopped")
+ throw new IllegalStateException("StreamingContext has already been stopped")
}
}
@@ -689,7 +690,7 @@ object StreamingContext extends Logging {
private def assertNoOtherContextIsActive(): Unit = {
ACTIVATION_LOCK.synchronized {
if (activeContext.get() != null) {
- throw new SparkException(
+ throw new IllegalStateException(
"Only one StreamingContext may be started in this JVM. " +
"Currently running StreamingContext was started at" +
activeContext.get.startSite.get.longForm)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 4c28654ef6..d06401245f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -109,7 +109,7 @@ private[python] object PythonTransformFunctionSerializer {
}
def serialize(func: PythonTransformFunction): Array[Byte] = {
- assert(serializer != null, "Serializer has not been registered!")
+ require(serializer != null, "Serializer has not been registered!")
// get the id of PythonTransformFunction in py4j
val h = Proxy.getInvocationHandler(func.asInstanceOf[Proxy])
val f = h.getClass().getDeclaredField("id")
@@ -119,7 +119,7 @@ private[python] object PythonTransformFunctionSerializer {
}
def deserialize(bytes: Array[Byte]): PythonTransformFunction = {
- assert(serializer != null, "Serializer has not been registered!")
+ require(serializer != null, "Serializer has not been registered!")
serializer.loads(bytes)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 7c50a766a9..c858647c64 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -217,53 +217,52 @@ abstract class DStream[T: ClassTag] (
case StreamingContextState.INITIALIZED =>
// good to go
case StreamingContextState.ACTIVE =>
- throw new SparkException(
+ throw new IllegalStateException(
"Adding new inputs, transformations, and output operations after " +
"starting a context is not supported")
case StreamingContextState.STOPPED =>
- throw new SparkException(
+ throw new IllegalStateException(
"Adding new inputs, transformations, and output operations after " +
"stopping a context is not supported")
}
}
private[streaming] def validateAtStart() {
- assert(rememberDuration != null, "Remember duration is set to null")
+ require(rememberDuration != null, "Remember duration is set to null")
- assert(
+ require(
!mustCheckpoint || checkpointDuration != null,
"The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
" Please use DStream.checkpoint() to set the interval."
)
- assert(
+ require(
checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
- "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
- " or SparkContext.checkpoint() to set the checkpoint directory."
+ "The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()."
)
- assert(
+ require(
checkpointDuration == null || checkpointDuration >= slideDuration,
"The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
"Please set it to at least " + slideDuration + "."
)
- assert(
+ require(
checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
"The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
- "Please set it to a multiple " + slideDuration + "."
+ "Please set it to a multiple of " + slideDuration + "."
)
- assert(
+ require(
checkpointDuration == null || storageLevel != StorageLevel.NONE,
"" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
"level has not been set to enable persisting. Please use DStream.persist() to set the " +
"storage level to use memory for better checkpointing performance."
)
- assert(
+ require(
checkpointDuration == null || rememberDuration > checkpointDuration,
"The remember duration for " + this.getClass.getSimpleName + " has been set to " +
rememberDuration + " which is not more than the checkpoint interval (" +
@@ -272,7 +271,7 @@ abstract class DStream[T: ClassTag] (
val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
- assert(
+ require(
metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
@@ -633,8 +632,8 @@ abstract class DStream[T: ClassTag] (
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
- // because the DStream is reachable from the outer object here, and because
- // DStreams can't be serialized with closures, we can't proactively check
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
}
@@ -644,8 +643,8 @@ abstract class DStream[T: ClassTag] (
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
- // because the DStream is reachable from the outer object here, and because
- // DStreams can't be serialized with closures, we can't proactively check
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
transform((r: RDD[T], t: Time) => cleanedF(r))
@@ -656,8 +655,8 @@ abstract class DStream[T: ClassTag] (
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
- // because the DStream is reachable from the outer object here, and because
- // DStreams can't be serialized with closures, we can't proactively check
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
@@ -674,8 +673,8 @@ abstract class DStream[T: ClassTag] (
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V] = ssc.withScope {
- // because the DStream is reachable from the outer object here, and because
- // DStreams can't be serialized with closures, we can't proactively check
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = ssc.sparkContext.clean(transformFunc, false)
transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
@@ -688,8 +687,8 @@ abstract class DStream[T: ClassTag] (
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V] = ssc.withScope {
- // because the DStream is reachable from the outer object here, and because
- // DStreams can't be serialized with closures, we can't proactively check
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = ssc.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 1385ccbf56..df9f7f140e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -40,12 +40,12 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
partitioner: Partitioner
) extends DStream[(K,V)](parent.ssc) {
- assert(_windowDuration.isMultipleOf(parent.slideDuration),
+ require(_windowDuration.isMultipleOf(parent.slideDuration),
"The window duration of ReducedWindowedDStream (" + _windowDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)
- assert(_slideDuration.isMultipleOf(parent.slideDuration),
+ require(_slideDuration.isMultipleOf(parent.slideDuration),
"The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index a9f4147a5f..7720259a5d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -153,7 +153,7 @@ private[streaming] class ReceivedBlockTracker(
* returns only after the files are cleaned up.
*/
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
- assert(cleanupThreshTime.milliseconds < clock.getTimeMillis())
+ require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
writeToLog(BatchCleanupEvent(timesToCleanup))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 3a958bf3a3..f8e8030791 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -182,7 +182,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.stop()
- intercept[SparkException] {
+ intercept[IllegalStateException] {
ssc.start() // start after stop should throw exception
}
assert(ssc.getState() === StreamingContextState.STOPPED)
@@ -600,7 +600,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val anotherInput = addInputStream(anotherSsc)
anotherInput.foreachRDD { rdd => rdd.count }
- val exception = intercept[SparkException] {
+ val exception = intercept[IllegalStateException] {
anotherSsc.start()
}
assert(exception.getMessage.contains("StreamingContext"), "Did not get the right exception")
@@ -623,7 +623,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
def testForException(clue: String, expectedErrorMsg: String)(body: => Unit): Unit = {
withClue(clue) {
- val ex = intercept[SparkException] {
+ val ex = intercept[IllegalStateException] {
body
}
assert(ex.getMessage.toLowerCase().contains(expectedErrorMsg))