diff options
author | Matei Zaharia <matei@databricks.com> | 2013-12-28 21:21:06 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2013-12-28 21:21:06 -0500 |
commit | 578bd1fc28513eb84002c604000250f5cff9b815 (patch) | |
tree | 715a061f7121ddb9a1eefe5215769564c33141e7 /streaming | |
parent | 5bbe73864eea78b76448ce42a7af847dad73b269 (diff) | |
download | spark-578bd1fc28513eb84002c604000250f5cff9b815.tar.gz spark-578bd1fc28513eb84002c604000250f5cff9b815.tar.bz2 spark-578bd1fc28513eb84002c604000250f5cff9b815.zip |
Fix test failures due to setting / clearing clock type in Streaming
Diffstat (limited to 'streaming')
4 files changed, 14 insertions, 10 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index daeb99f5b7..a1db0995e3 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -62,13 +62,14 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint"); } @After public void tearDown() { + System.clearProperty("spark.streaming.clock"); ssc.stop(); ssc = null; @@ -101,7 +102,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList("hello", "world"), Arrays.asList("goodnight", "moon")); - List<List<Integer>> expected = Arrays.asList( + List<List<Integer>> expected = Arrays.asList( Arrays.asList(5,5), Arrays.asList(9,4)); diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 259ef1608c..60e986cb9d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -23,14 +23,13 @@ import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import util.ManualClock +import org.apache.spark.{SparkContext, SparkConf} class BasicOperationsSuite extends TestSuiteBase { - override def framework() = "BasicOperationsSuite" + override def framework = "BasicOperationsSuite" - before { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") - } + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown @@ -387,7 +386,11 @@ class BasicOperationsSuite extends TestSuiteBase { } test("slice") { - val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1)) + val conf2 = new SparkConf() + .setMaster("local[2]") + .setAppName("BasicOperationsSuite") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + val ssc = new StreamingContext(new SparkContext(conf2), Seconds(1)) val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) ssc.registerInputStream(stream) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index a265284bff..3dd6718491 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -130,6 +130,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Whether to actually wait in real time before changing manual clock def actuallyWait = false + // A SparkConf to use in tests. Can be modified before calling setupStreams to configure things. val conf = new SparkConf() .setMaster(master) .setAppName(framework) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index f50e05c0d8..3242c4cd11 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -18,11 +18,10 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import collection.mutable.ArrayBuffer class WindowOperationsSuite extends TestSuiteBase { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") override def framework = "WindowOperationsSuite" |