aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java14
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala1
2 files changed, 15 insertions, 0 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index b1adf881dd..2e00b980b9 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -72,6 +72,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
+ public void testContextState() {
+ List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
+ Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaTestUtils.attachTestOutputStream(stream);
+ Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+ ssc.start();
+ Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE);
+ ssc.stop();
+ Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3,4),
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index c0ea0491c3..bb80bff6dc 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase {
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
implicit val cm: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ ssc.getState()
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))