From 33bad85bb9143d41bc5de2068f7e8a8c39928225 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 20 Jan 2013 03:51:11 -0800 Subject: Fixed streaming testsuite bugs --- streaming/src/test/java/JavaAPISuite.java | 2 ++ streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala | 5 +++++ streaming/src/test/scala/spark/streaming/CheckpointSuite.scala | 6 +++--- streaming/src/test/scala/spark/streaming/FailureSuite.scala | 3 +++ streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 3 +++ streaming/src/test/scala/spark/streaming/TestSuiteBase.scala | 6 +++--- .../src/test/scala/spark/streaming/WindowOperationsSuite.scala | 5 +++++ 7 files changed, 24 insertions(+), 6 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/JavaAPISuite.java index 8c94e13e65..c84e7331c7 100644 --- a/streaming/src/test/java/JavaAPISuite.java +++ b/streaming/src/test/java/JavaAPISuite.java @@ -34,12 +34,14 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint", new Duration(1000)); } @After public void tearDown() { ssc.stop(); ssc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port"); } diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index f73f9b1823..bfdf32c73e 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -8,6 +8,11 @@ class BasicOperationsSuite extends TestSuiteBase { override def framework() = "BasicOperationsSuite" + after { + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + test("map") { val input = Seq(1 to 4, 5 to 8, 9 to 12) testOperation( diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 920388bba9..d2f32c189b 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -15,9 +15,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { } after { - if (ssc != null) ssc.stop() FileUtils.deleteDirectory(new File(checkpointDir)) + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } var ssc: StreamingContext = null @@ -26,8 +28,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { override def batchDuration = Milliseconds(500) - override def checkpointDir = "checkpoint" - override def checkpointInterval = batchDuration override def actuallyWait = true diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index 4aa428bf64..7493ac1207 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -22,6 +22,9 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter { after { FailureSuite.reset() FileUtils.deleteDirectory(new File(checkpointDir)) + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } override def framework = "CheckpointSuite" diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index e71ba6ddc1..d7ba7a5d17 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -40,6 +40,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { FileUtils.deleteDirectory(testDir) testDir = null } + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("network input stream") { diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index a76f61d4ad..49129f3964 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -10,7 +10,7 @@ import collection.mutable.SynchronizedBuffer import java.io.{ObjectInputStream, IOException} -import org.scalatest.FunSuite +import org.scalatest.{BeforeAndAfter, FunSuite} /** * This is a input stream just for the testsuites. This is equivalent to a checkpointable, @@ -56,7 +56,7 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu * This is the base trait for Spark Streaming testsuites. This provides basic functionality * to run user-defined set of input on user-defined stream operations, and verify the output. */ -trait TestSuiteBase extends FunSuite with Logging { +trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { def framework = "TestSuiteBase" @@ -64,7 +64,7 @@ trait TestSuiteBase extends FunSuite with Logging { def batchDuration = Seconds(1) - def checkpointDir = null.asInstanceOf[String] + def checkpointDir = "checkpoint" def checkpointInterval = batchDuration diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index f9ba1f20f0..0c6e928835 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -11,6 +11,11 @@ class WindowOperationsSuite extends TestSuiteBase { override def batchDuration = Seconds(1) + after { + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + } + val largerSlideInput = Seq( Seq(("a", 1)), Seq(("a", 2)), // 1st window from here -- cgit v1.2.3