aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-20 03:51:11 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-20 03:51:11 -0800
commit33bad85bb9143d41bc5de2068f7e8a8c39928225 (patch)
tree87695f0e35c893ab2c9f3cc2c1e369beadc9c623 /streaming/src
parent4f8fe58b2579b6040bb9a7d1fcf9adc19843a97b (diff)
downloadspark-33bad85bb9143d41bc5de2068f7e8a8c39928225.tar.gz
spark-33bad85bb9143d41bc5de2068f7e8a8c39928225.tar.bz2
spark-33bad85bb9143d41bc5de2068f7e8a8c39928225.zip
Fixed streaming testsuite bugs
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/test/java/JavaAPISuite.java2
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala5
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala6
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala3
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala3
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala6
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala5
7 files changed, 24 insertions, 6 deletions
diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/JavaAPISuite.java
index 8c94e13e65..c84e7331c7 100644
--- a/streaming/src/test/java/JavaAPISuite.java
+++ b/streaming/src/test/java/JavaAPISuite.java
@@ -34,12 +34,14 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint", new Duration(1000));
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
+
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port");
}
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index f73f9b1823..bfdf32c73e 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -8,6 +8,11 @@ class BasicOperationsSuite extends TestSuiteBase {
override def framework() = "BasicOperationsSuite"
+ after {
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+
test("map") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 920388bba9..d2f32c189b 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -15,9 +15,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
after {
-
if (ssc != null) ssc.stop()
FileUtils.deleteDirectory(new File(checkpointDir))
+
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
var ssc: StreamingContext = null
@@ -26,8 +28,6 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
override def batchDuration = Milliseconds(500)
- override def checkpointDir = "checkpoint"
-
override def checkpointInterval = batchDuration
override def actuallyWait = true
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index 4aa428bf64..7493ac1207 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -22,6 +22,9 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
after {
FailureSuite.reset()
FileUtils.deleteDirectory(new File(checkpointDir))
+
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
override def framework = "CheckpointSuite"
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index e71ba6ddc1..d7ba7a5d17 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -40,6 +40,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(testDir)
testDir = null
}
+
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("network input stream") {
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index a76f61d4ad..49129f3964 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -10,7 +10,7 @@ import collection.mutable.SynchronizedBuffer
import java.io.{ObjectInputStream, IOException}
-import org.scalatest.FunSuite
+import org.scalatest.{BeforeAndAfter, FunSuite}
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -56,7 +56,7 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
* This is the base trait for Spark Streaming testsuites. This provides basic functionality
* to run user-defined set of input on user-defined stream operations, and verify the output.
*/
-trait TestSuiteBase extends FunSuite with Logging {
+trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
def framework = "TestSuiteBase"
@@ -64,7 +64,7 @@ trait TestSuiteBase extends FunSuite with Logging {
def batchDuration = Seconds(1)
- def checkpointDir = null.asInstanceOf[String]
+ def checkpointDir = "checkpoint"
def checkpointInterval = batchDuration
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index f9ba1f20f0..0c6e928835 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -11,6 +11,11 @@ class WindowOperationsSuite extends TestSuiteBase {
override def batchDuration = Seconds(1)
+ after {
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+
val largerSlideInput = Seq(
Seq(("a", 1)),
Seq(("a", 2)), // 1st window from here