diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-11 18:53:50 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-11 18:53:50 -0700 |
commit | f9c7580adadce75a94bd2854cf4f743d8cbd1d23 (patch) | |
tree | 495efb608b2ce4f1eb7e07416bdba9b10c92d30b /streaming/src/test/java | |
parent | 35fb42a0b01d3043b7d5e27256d1b45a08583aab (diff) | |
download | spark-f9c7580adadce75a94bd2854cf4f743d8cbd1d23.tar.gz spark-f9c7580adadce75a94bd2854cf4f743d8cbd1d23.tar.bz2 spark-f9c7580adadce75a94bd2854cf4f743d8cbd1d23.zip |
[SPARK-7530] [STREAMING] Added StreamingContext.getState() to expose the current state of the context
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #6058 from tdas/SPARK-7530 and squashes the following commits:
80ee0e6 [Tathagata Das] STARTED --> ACTIVE
3da6547 [Tathagata Das] Added synchronized
dd88444 [Tathagata Das] Added more docs
e1a8505 [Tathagata Das] Fixed comment length
89f9980 [Tathagata Das] Change to Java enum and added Java test
7c57351 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
dd4e702 [Tathagata Das] Addressed comments.
3d56106 [Tathagata Das] Added Mima excludes
2b86ba1 [Tathagata Das] Added scala docs.
1722433 [Tathagata Das] Fixed style
976b094 [Tathagata Das] Added license
0585130 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
e0f0a05 [Tathagata Das] Added getState and exposed StreamingContextState
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 14 | ||||
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala | 1 |
2 files changed, 15 insertions, 0 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index b1adf881dd..2e00b980b9 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -72,6 +72,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @SuppressWarnings("unchecked") @Test + public void testContextState() { + List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4)); + Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaTestUtils.attachTestOutputStream(stream); + Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + ssc.start(); + Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE); + ssc.stop(); + Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED); + } + + @SuppressWarnings("unchecked") + @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1,2,3,4), diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index c0ea0491c3..bb80bff6dc 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase { ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { implicit val cm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + ssc.getState() val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[V]]() res.map(entry => out.append(new ArrayList[V](entry))) |