From f9c7580adadce75a94bd2854cf4f743d8cbd1d23 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 11 May 2015 18:53:50 -0700 Subject: [SPARK-7530] [STREAMING] Added StreamingContext.getState() to expose the current state of the context Author: Tathagata Das Closes #6058 from tdas/SPARK-7530 and squashes the following commits: 80ee0e6 [Tathagata Das] STARTED --> ACTIVE 3da6547 [Tathagata Das] Added synchronized dd88444 [Tathagata Das] Added more docs e1a8505 [Tathagata Das] Fixed comment length 89f9980 [Tathagata Das] Change to Java enum and added Java test 7c57351 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530 dd4e702 [Tathagata Das] Addressed comments. 3d56106 [Tathagata Das] Added Mima excludes 2b86ba1 [Tathagata Das] Added scala docs. 1722433 [Tathagata Das] Fixed style 976b094 [Tathagata Das] Added license 0585130 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530 e0f0a05 [Tathagata Das] Added getState and exposed StreamingContextState --- .../java/org/apache/spark/streaming/JavaAPISuite.java | 14 ++++++++++++++ .../org/apache/spark/streaming/JavaTestUtils.scala | 1 + .../spark/streaming/StreamingContextSuite.scala | 19 ++++++++++++++++--- 3 files changed, 31 insertions(+), 3 deletions(-) (limited to 'streaming/src/test') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index b1adf881dd..2e00b980b9 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -70,6 +70,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertNotNull(ssc.sparkContext()); } + @SuppressWarnings("unchecked") + @Test + public void testContextState() { + List> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4)); + Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaTestUtils.attachTestOutputStream(stream); + Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + ssc.start(); + Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE); + ssc.stop(); + Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED); + } + @SuppressWarnings("unchecked") @Test public void testCount() { diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index c0ea0491c3..bb80bff6dc 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase { ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { implicit val cm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + ssc.getState() val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[V]]() res.map(entry => out.append(new ArrayList[V](entry))) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 11c7fd835b..b8247db7e8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -109,15 +109,21 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) } + test("state matching") { + import StreamingContextState._ + assert(INITIALIZED === INITIALIZED) + assert(INITIALIZED != ACTIVE) + } + test("start and stop state check") { ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register() - assert(ssc.state === ssc.StreamingContextState.Initialized) + assert(ssc.getState() === StreamingContextState.INITIALIZED) ssc.start() - assert(ssc.state === ssc.StreamingContextState.Started) + assert(ssc.getState() === StreamingContextState.ACTIVE) ssc.stop() - assert(ssc.state === ssc.StreamingContextState.Stopped) + assert(ssc.getState() === StreamingContextState.STOPPED) // Make sure that the SparkContext is also stopped by default intercept[Exception] { @@ -129,9 +135,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register() ssc.start() + assert(ssc.getState() === StreamingContextState.ACTIVE) intercept[SparkException] { ssc.start() } + assert(ssc.getState() === StreamingContextState.ACTIVE) } test("stop multiple times") { @@ -139,13 +147,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w addInputStream(ssc).register() ssc.start() ssc.stop() + assert(ssc.getState() === StreamingContextState.STOPPED) ssc.stop() + assert(ssc.getState() === StreamingContextState.STOPPED) } test("stop before start") { ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register() ssc.stop() // stop before start should not throw exception + assert(ssc.getState() === StreamingContextState.STOPPED) } test("start after stop") { @@ -156,6 +167,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w intercept[SparkException] { ssc.start() // start after stop should throw exception } + assert(ssc.getState() === StreamingContextState.STOPPED) } test("stop only streaming context") { @@ -167,6 +179,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w addInputStream(ssc).register() ssc.start() ssc.stop(stopSparkContext = false) + assert(ssc.getState() === StreamingContextState.STOPPED) assert(sc.makeRDD(1 to 100).collect().size === 100) sc.stop() -- cgit v1.2.3