aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-11 23:15:09 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-11 23:15:09 -0800
commitf5108ffc24eccd21f5d6dc4114ea47b0ab14ab14 (patch)
tree2b446a3e73398929ab01491e16adcba7ec654736 /streaming/src/test
parent4f39e79c23b32a411a0d5fdc86b5c17ab2250f8d (diff)
downloadspark-f5108ffc24eccd21f5d6dc4114ea47b0ab14ab14.tar.gz
spark-f5108ffc24eccd21f5d6dc4114ea47b0ab14ab14.tar.bz2
spark-f5108ffc24eccd21f5d6dc4114ea47b0ab14ab14.zip
Converted JobScheduler to use actors for event handling. Changed protected[streaming] to private[streaming] in StreamingContext and DStream. Added waitForStop to StreamingContext, and StreamingContextSuite.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala22
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala208
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala2
4 files changed, 232 insertions, 6 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index ee6b433d1f..2e3a1e66ad 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -375,11 +375,7 @@ class BasicOperationsSuite extends TestSuiteBase {
}
test("slice") {
- val conf2 = new SparkConf()
- .setMaster("local[2]")
- .setAppName("BasicOperationsSuite")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
- val ssc = new StreamingContext(new SparkContext(conf2), Seconds(1))
+ val ssc = new StreamingContext(conf, Seconds(1))
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
ssc.registerInputStream(stream)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 6499de98c9..9590bca989 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.FileInputDStream
import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -142,6 +144,26 @@ class CheckpointSuite extends TestSuiteBase {
ssc = null
}
+ // This tests whether spark conf persists through checkpoints, and certain
+ // configs gets scrubbed
+ test("persistence of conf through checkpoints") {
+ val key = "spark.mykey"
+ val value = "myvalue"
+ System.setProperty(key, value)
+ ssc = new StreamingContext(master, framework, batchDuration)
+ val cp = new Checkpoint(ssc, Time(1000))
+ assert(!cp.sparkConf.contains("spark.driver.host"))
+ assert(!cp.sparkConf.contains("spark.driver.port"))
+ assert(!cp.sparkConf.contains("spark.hostPort"))
+ assert(cp.sparkConf.get(key) === value)
+ ssc.stop()
+ val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
+ assert(!newCp.sparkConf.contains("spark.driver.host"))
+ assert(!newCp.sparkConf.contains("spark.driver.port"))
+ assert(!newCp.sparkConf.contains("spark.hostPort"))
+ assert(newCp.sparkConf.get(key) === value)
+ }
+
// This tests whether the systm can recover from a master failure with simple
// non-stateful operations. This assumes as reliable, replayable input
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
new file mode 100644
index 0000000000..9eb9b3684c
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.scalatest.{FunSuite, BeforeAndAfter}
+import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
+import org.apache.spark.{SparkException, SparkConf, SparkContext}
+import org.apache.spark.util.{Utils, MetadataCleaner}
+
+class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
+
+ val master = "local[2]"
+ val appName = this.getClass.getSimpleName
+ val batchDuration = Seconds(1)
+ val sparkHome = "someDir"
+ val envPair = "key" -> "value"
+ val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100
+
+ var sc: SparkContext = null
+ var ssc: StreamingContext = null
+
+ before {
+ System.clearProperty("spark.cleaner.ttl")
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+ }
+
+ test("from no conf constructor") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ assert(ssc.sparkContext.conf.get("spark.master") === master)
+ assert(ssc.sparkContext.conf.get("spark.app.name") === appName)
+ assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
+ StreamingContext.DEFAULT_CLEANER_TTL)
+ }
+
+ test("from no conf + spark home") {
+ ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil)
+ assert(ssc.conf.get("spark.home") === sparkHome)
+ assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
+ StreamingContext.DEFAULT_CLEANER_TTL)
+ }
+
+ test("from no conf + spark home + env") {
+ ssc = new StreamingContext(master, appName, batchDuration,
+ sparkHome, Nil, Map(envPair))
+ assert(ssc.conf.getExecutorEnv.exists(_ == envPair))
+ assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
+ StreamingContext.DEFAULT_CLEANER_TTL)
+ }
+
+ test("from conf without ttl set") {
+ val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
+ ssc = new StreamingContext(myConf, batchDuration)
+ assert(MetadataCleaner.getDelaySeconds(ssc.conf) ===
+ StreamingContext.DEFAULT_CLEANER_TTL)
+ }
+
+ test("from conf with ttl set") {
+ val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
+ myConf.set("spark.cleaner.ttl", ttl.toString)
+ ssc = new StreamingContext(myConf, batchDuration)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+ }
+
+ test("from existing SparkContext without ttl set") {
+ sc = new SparkContext(master, appName)
+ val exception = intercept[SparkException] {
+ ssc = new StreamingContext(sc, batchDuration)
+ }
+ assert(exception.getMessage.contains("ttl"))
+ }
+
+ test("from existing SparkContext with ttl set") {
+ val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
+ myConf.set("spark.cleaner.ttl", ttl.toString)
+ ssc = new StreamingContext(myConf, batchDuration)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+ }
+
+ test("from checkpoint") {
+ val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
+ myConf.set("spark.cleaner.ttl", ttl.toString)
+ val ssc1 = new StreamingContext(myConf, batchDuration)
+ val cp = new Checkpoint(ssc1, Time(1000))
+ assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl)
+ ssc1.stop()
+ val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
+ assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl)
+ ssc = new StreamingContext(null, cp, null)
+ assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl)
+ }
+
+ test("start multiple times") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register
+
+ ssc.start()
+ intercept[SparkException] {
+ ssc.start()
+ }
+ }
+
+ test("stop multiple times") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ ssc.stop()
+ ssc.stop()
+ ssc = null
+ }
+
+ test("stop only streaming context") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ ssc.stop(false)
+ ssc = null
+ assert(sc.makeRDD(1 to 100).collect().size === 100)
+ }
+
+ test("waitForStop") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ val inputStream = addInputStream(ssc)
+ inputStream.map(x => x).register
+
+ // test whether start() blocks indefinitely or not
+ failAfter(2000 millis) {
+ ssc.start()
+ }
+
+ // test whether waitForStop() exits after give amount of time
+ failAfter(1000 millis) {
+ ssc.waitForStop(500)
+ }
+
+ // test whether waitForStop() does not exit if not time is given
+ val exception = intercept[Exception] {
+ failAfter(1000 millis) {
+ ssc.waitForStop()
+ throw new Exception("Did not wait for stop")
+ }
+ }
+ assert(exception.isInstanceOf[TestFailedDueToTimeoutException], "Did not wait for stop")
+
+ // test whether wait exits if context is stopped
+ failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
+ new Thread() {
+ override def run {
+ Thread.sleep(500)
+ ssc.stop()
+ }
+ }.start()
+ ssc.waitForStop()
+ }
+ }
+
+ test("waitForStop with error in task") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ val inputStream = addInputStream(ssc)
+ inputStream.map(x => { throw new TestException("error in map task"); x})
+ .foreach(_.count)
+
+ val exception = intercept[Exception] {
+ ssc.start()
+ ssc.waitForStop(5000)
+ }
+ assert(exception.getMessage.contains("map task"), "Expected exception not thrown")
+ }
+
+ test("waitForStop with error in job generation") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ val inputStream = addInputStream(ssc)
+
+ inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register
+ val exception = intercept[TestException] {
+ ssc.start()
+ ssc.waitForStop(5000)
+ }
+ assert(exception.getMessage.contains("transform"), "Expected exception not thrown")
+ }
+
+ def addInputStream(s: StreamingContext): DStream[Int] = {
+ val input = (1 to 100).map(i => (1 to i))
+ val inputStream = new TestInputStream(s, input, 1)
+ s.registerInputStream(inputStream)
+ inputStream
+ }
+}
+
+class TestException(msg: String) extends Exception(msg) \ No newline at end of file
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index b20d02f996..3569624d51 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -137,7 +137,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val conf = new SparkConf()
.setMaster(master)
.setAppName(framework)
- .set("spark.cleaner.ttl", "3600")
+ .set("spark.cleaner.ttl", StreamingContext.DEFAULT_CLEANER_TTL.toString)
// Default before function for any streaming test suite. Override this
// if you want to add your stuff to "before" (i.e., don't call before { } )