aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-28 21:21:06 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-28 21:21:06 -0500
commit578bd1fc28513eb84002c604000250f5cff9b815 (patch)
tree715a061f7121ddb9a1eefe5215769564c33141e7 /streaming
parent5bbe73864eea78b76448ce42a7af847dad73b269 (diff)
downloadspark-578bd1fc28513eb84002c604000250f5cff9b815.tar.gz
spark-578bd1fc28513eb84002c604000250f5cff9b815.tar.bz2
spark-578bd1fc28513eb84002c604000250f5cff9b815.zip
Fix test failures due to setting / clearing clock type in Streaming
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala3
4 files changed, 14 insertions, 10 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index daeb99f5b7..a1db0995e3 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -62,13 +62,14 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
+ System.clearProperty("spark.streaming.clock");
ssc.stop();
ssc = null;
@@ -101,7 +102,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("hello", "world"),
Arrays.asList("goodnight", "moon"));
- List<List<Integer>> expected = Arrays.asList(
+ List<List<Integer>> expected = Arrays.asList(
Arrays.asList(5,5),
Arrays.asList(9,4));
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 259ef1608c..60e986cb9d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -23,14 +23,13 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import util.ManualClock
+import org.apache.spark.{SparkContext, SparkConf}
class BasicOperationsSuite extends TestSuiteBase {
- override def framework() = "BasicOperationsSuite"
+ override def framework = "BasicOperationsSuite"
- before {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
- }
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
@@ -387,7 +386,11 @@ class BasicOperationsSuite extends TestSuiteBase {
}
test("slice") {
- val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1))
+ val conf2 = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("BasicOperationsSuite")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ val ssc = new StreamingContext(new SparkContext(conf2), Seconds(1))
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
val stream = new TestInputStream[Int](ssc, input, 2)
ssc.registerInputStream(stream)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index a265284bff..3dd6718491 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -130,6 +130,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Whether to actually wait in real time before changing manual clock
def actuallyWait = false
+ // A SparkConf to use in tests. Can be modified before calling setupStreams to configure things.
val conf = new SparkConf()
.setMaster(master)
.setAppName(framework)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index f50e05c0d8..3242c4cd11 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -18,11 +18,10 @@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
-import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
override def framework = "WindowOperationsSuite"