diff options
Diffstat (limited to 'streaming/src/test')
4 files changed, 232 insertions, 6 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index ee6b433d1f..2e3a1e66ad 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -375,11 +375,7 @@ class BasicOperationsSuite extends TestSuiteBase { } test("slice") { - val conf2 = new SparkConf() - .setMaster("local[2]") - .setAppName("BasicOperationsSuite") - .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") - val ssc = new StreamingContext(new SparkContext(conf2), Seconds(1)) + val ssc = new StreamingContext(conf, Seconds(1)) val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) ssc.registerInputStream(stream) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 6499de98c9..9590bca989 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.FileInputDStream import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.Utils +import org.apache.spark.SparkConf /** * This test suites tests the checkpointing functionality of DStreams - @@ -142,6 +144,26 @@ class CheckpointSuite extends TestSuiteBase { ssc = null } + // This tests whether spark conf persists through checkpoints, and certain + // configs gets scrubbed + test("persistence of conf through checkpoints") { + val key = "spark.mykey" + val value = "myvalue" + System.setProperty(key, value) + ssc = new StreamingContext(master, framework, batchDuration) + val cp = new Checkpoint(ssc, Time(1000)) + assert(!cp.sparkConf.contains("spark.driver.host")) + assert(!cp.sparkConf.contains("spark.driver.port")) + assert(!cp.sparkConf.contains("spark.hostPort")) + assert(cp.sparkConf.get(key) === value) + ssc.stop() + val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) + assert(!newCp.sparkConf.contains("spark.driver.host")) + assert(!newCp.sparkConf.contains("spark.driver.port")) + assert(!newCp.sparkConf.contains("spark.hostPort")) + assert(newCp.sparkConf.get(key) === value) + } + // This tests whether the systm can recover from a master failure with simple // non-stateful operations. This assumes as reliable, replayable input diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala new file mode 100644 index 0000000000..9eb9b3684c --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.scalatest.{FunSuite, BeforeAndAfter} +import org.scalatest.exceptions.TestFailedDueToTimeoutException +import org.scalatest.concurrent.Timeouts +import org.scalatest.time.SpanSugar._ +import org.apache.spark.{SparkException, SparkConf, SparkContext} +import org.apache.spark.util.{Utils, MetadataCleaner} + +class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { + + val master = "local[2]" + val appName = this.getClass.getSimpleName + val batchDuration = Seconds(1) + val sparkHome = "someDir" + val envPair = "key" -> "value" + val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100 + + var sc: SparkContext = null + var ssc: StreamingContext = null + + before { + System.clearProperty("spark.cleaner.ttl") + } + + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + } + + test("from no conf constructor") { + ssc = new StreamingContext(master, appName, batchDuration) + assert(ssc.sparkContext.conf.get("spark.master") === master) + assert(ssc.sparkContext.conf.get("spark.app.name") === appName) + assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) === + StreamingContext.DEFAULT_CLEANER_TTL) + } + + test("from no conf + spark home") { + ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil) + assert(ssc.conf.get("spark.home") === sparkHome) + assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) === + StreamingContext.DEFAULT_CLEANER_TTL) + } + + test("from no conf + spark home + env") { + ssc = new StreamingContext(master, appName, batchDuration, + sparkHome, Nil, Map(envPair)) + assert(ssc.conf.getExecutorEnv.exists(_ == envPair)) + assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) === + StreamingContext.DEFAULT_CLEANER_TTL) + } + + test("from conf without ttl set") { + val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) + ssc = new StreamingContext(myConf, batchDuration) + assert(MetadataCleaner.getDelaySeconds(ssc.conf) === + StreamingContext.DEFAULT_CLEANER_TTL) + } + + test("from conf with ttl set") { + val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) + myConf.set("spark.cleaner.ttl", ttl.toString) + ssc = new StreamingContext(myConf, batchDuration) + assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl) + } + + test("from existing SparkContext without ttl set") { + sc = new SparkContext(master, appName) + val exception = intercept[SparkException] { + ssc = new StreamingContext(sc, batchDuration) + } + assert(exception.getMessage.contains("ttl")) + } + + test("from existing SparkContext with ttl set") { + val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) + myConf.set("spark.cleaner.ttl", ttl.toString) + ssc = new StreamingContext(myConf, batchDuration) + assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl) + } + + test("from checkpoint") { + val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) + myConf.set("spark.cleaner.ttl", ttl.toString) + val ssc1 = new StreamingContext(myConf, batchDuration) + val cp = new Checkpoint(ssc1, Time(1000)) + assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl) + ssc1.stop() + val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) + assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl) + ssc = new StreamingContext(null, cp, null) + assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl) + } + + test("start multiple times") { + ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register + + ssc.start() + intercept[SparkException] { + ssc.start() + } + } + + test("stop multiple times") { + ssc = new StreamingContext(master, appName, batchDuration) + ssc.stop() + ssc.stop() + ssc = null + } + + test("stop only streaming context") { + ssc = new StreamingContext(master, appName, batchDuration) + ssc.stop(false) + ssc = null + assert(sc.makeRDD(1 to 100).collect().size === 100) + } + + test("waitForStop") { + ssc = new StreamingContext(master, appName, batchDuration) + val inputStream = addInputStream(ssc) + inputStream.map(x => x).register + + // test whether start() blocks indefinitely or not + failAfter(2000 millis) { + ssc.start() + } + + // test whether waitForStop() exits after give amount of time + failAfter(1000 millis) { + ssc.waitForStop(500) + } + + // test whether waitForStop() does not exit if not time is given + val exception = intercept[Exception] { + failAfter(1000 millis) { + ssc.waitForStop() + throw new Exception("Did not wait for stop") + } + } + assert(exception.isInstanceOf[TestFailedDueToTimeoutException], "Did not wait for stop") + + // test whether wait exits if context is stopped + failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown + new Thread() { + override def run { + Thread.sleep(500) + ssc.stop() + } + }.start() + ssc.waitForStop() + } + } + + test("waitForStop with error in task") { + ssc = new StreamingContext(master, appName, batchDuration) + val inputStream = addInputStream(ssc) + inputStream.map(x => { throw new TestException("error in map task"); x}) + .foreach(_.count) + + val exception = intercept[Exception] { + ssc.start() + ssc.waitForStop(5000) + } + assert(exception.getMessage.contains("map task"), "Expected exception not thrown") + } + + test("waitForStop with error in job generation") { + ssc = new StreamingContext(master, appName, batchDuration) + val inputStream = addInputStream(ssc) + + inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register + val exception = intercept[TestException] { + ssc.start() + ssc.waitForStop(5000) + } + assert(exception.getMessage.contains("transform"), "Expected exception not thrown") + } + + def addInputStream(s: StreamingContext): DStream[Int] = { + val input = (1 to 100).map(i => (1 to i)) + val inputStream = new TestInputStream(s, input, 1) + s.registerInputStream(inputStream) + inputStream + } +} + +class TestException(msg: String) extends Exception(msg)
\ No newline at end of file diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index b20d02f996..3569624d51 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -137,7 +137,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val conf = new SparkConf() .setMaster(master) .setAppName(framework) - .set("spark.cleaner.ttl", "3600") + .set("spark.cleaner.ttl", StreamingContext.DEFAULT_CLEANER_TTL.toString) // Default before function for any streaming test suite. Override this // if you want to add your stuff to "before" (i.e., don't call before { } ) |