diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-11 18:53:50 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-11 18:54:06 -0700 |
commit | c16b47f9ed2753384bff6fe0fc303ae4f3df8eb8 (patch) | |
tree | 7db4a8b22649fa2e1819fde9b1caba451fc886a9 | |
parent | f1888159894c2ed446cae0ce52fad199e2dda4cd (diff) | |
download | spark-c16b47f9ed2753384bff6fe0fc303ae4f3df8eb8.tar.gz spark-c16b47f9ed2753384bff6fe0fc303ae4f3df8eb8.tar.bz2 spark-c16b47f9ed2753384bff6fe0fc303ae4f3df8eb8.zip |
[SPARK-7530] [STREAMING] Added StreamingContext.getState() to expose the current state of the context
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #6058 from tdas/SPARK-7530 and squashes the following commits:
80ee0e6 [Tathagata Das] STARTED --> ACTIVE
3da6547 [Tathagata Das] Added synchronized
dd88444 [Tathagata Das] Added more docs
e1a8505 [Tathagata Das] Fixed comment length
89f9980 [Tathagata Das] Change to Java enum and added Java test
7c57351 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
dd4e702 [Tathagata Das] Addressed comments.
3d56106 [Tathagata Das] Added Mima excludes
2b86ba1 [Tathagata Das] Added scala docs.
1722433 [Tathagata Das] Fixed style
976b094 [Tathagata Das] Added license
0585130 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530
e0f0a05 [Tathagata Das] Added getState and exposed StreamingContextState
(cherry picked from commit f9c7580adadce75a94bd2854cf4f743d8cbd1d23)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
7 files changed, 147 insertions, 33 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index cfe387faec..ad3d8426bd 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -106,6 +106,10 @@ object MimaExcludes { "org.apache.spark.sql.parquet.ParquetTestData$"), ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.sql.parquet.TestGroupWriteSupport") + ) ++ Seq( + // SPARK-7530 Added StreamingContext.getState() + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.StreamingContext.state_=") ) case v if v.startsWith("1.3") => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 5abe136775..2c5834defa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -32,10 +32,11 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.input.FixedLengthBinaryInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} @@ -195,14 +196,7 @@ class StreamingContext private[streaming] ( assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) - /** Enumeration to identify current state of the StreamingContext */ - private[streaming] object StreamingContextState extends Enumeration { - type CheckpointState = Value - val Initialized, Started, Stopped = Value - } - - import StreamingContextState._ - private[streaming] var state = Initialized + private var state: StreamingContextState = INITIALIZED private val startSite = new AtomicReference[CallSite](null) @@ -517,17 +511,34 @@ class StreamingContext private[streaming] ( } /** + * :: DeveloperApi :: + * + * Return the current state of the context. The context can be in three possible states - + * - StreamingContextState.INTIALIZED - The context has been created, but not been started yet. + * Input DStreams, transformations and output operations can be created on the context. + * - StreamingContextState.ACTIVE - The context has been started, and been not stopped. + * Input DStreams, transformations and output operations cannot be created on the context. + * - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more. + */ + @DeveloperApi + def getState(): StreamingContextState = synchronized { + state + } + + /** * Start the execution of the streams. * * @throws SparkException if the context has already been started or stopped. */ def start(): Unit = synchronized { import StreamingContext._ - if (state == Started) { - throw new SparkException("StreamingContext has already been started") - } - if (state == Stopped) { - throw new SparkException("StreamingContext has already been stopped") + state match { + case INITIALIZED => + // good to start + case ACTIVE => + throw new SparkException("StreamingContext has already been started") + case STOPPED => + throw new SparkException("StreamingContext has already been stopped") } validate() startSite.set(DStream.getCreationSite()) @@ -536,7 +547,7 @@ class StreamingContext private[streaming] ( assertNoOtherContextIsActive() scheduler.start() uiTab.foreach(_.attach()) - state = Started + state = StreamingContextState.ACTIVE setActiveContext(this) } } @@ -598,22 +609,26 @@ class StreamingContext private[streaming] ( * received data to be completed */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { - state match { - case Initialized => logWarning("StreamingContext has not been started yet") - case Stopped => logWarning("StreamingContext has already been stopped") - case Started => - scheduler.stop(stopGracefully) - logInfo("StreamingContext stopped successfully") - waiter.notifyStop() + try { + state match { + case INITIALIZED => + logWarning("StreamingContext has not been started yet") + case STOPPED => + logWarning("StreamingContext has already been stopped") + case ACTIVE => + scheduler.stop(stopGracefully) + uiTab.foreach(_.detach()) + StreamingContext.setActiveContext(null) + waiter.notifyStop() + logInfo("StreamingContext stopped successfully") + } + // Even if we have already stopped, we still need to attempt to stop the SparkContext because + // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). + if (stopSparkContext) sc.stop() + } finally { + // The state should always be Stopped after calling `stop()`, even if we haven't started yet + state = STOPPED } - // Even if the streaming context has not been started, we still need to stop the SparkContext. - // Even if we have already stopped, we still need to attempt to stop the SparkContext because - // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). - if (stopSparkContext) sc.stop() - uiTab.foreach(_.detach()) - // The state should always be Stopped after calling `stop()`, even if we haven't started yet: - state = Stopped - StreamingContext.setActiveContext(null) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java new file mode 100644 index 0000000000..d7b639383e --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * :: DeveloperApi :: + * + * Represents the state of a StreamingContext. + */ +@DeveloperApi +public enum StreamingContextState { + /** + * The context has been created, but not been started yet. + * Input DStreams, transformations and output operations can be created on the context. + */ + INITIALIZED, + + /** + * The context has been started, and been not stopped. + * Input DStreams, transformations and output operations cannot be created on the context. + */ + ACTIVE, + + /** + * The context has been stopped and cannot be used any more. + */ + STOPPED +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 572d7d8e87..d8fbed2c50 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -579,6 +579,28 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { } /** + * :: DeveloperApi :: + * + * Return the current state of the context. The context can be in three possible states - + * <ul> + * <li> + * StreamingContextState.INTIALIZED - The context has been created, but not been started yet. + * Input DStreams, transformations and output operations can be created on the context. + * </li> + * <li> + * StreamingContextState.ACTIVE - The context has been started, and been not stopped. + * Input DStreams, transformations and output operations cannot be created on the context. + * </li> + * <li> + * StreamingContextState.STOPPED - The context has been stopped and cannot be used any more. + * </li> + * </ul> + */ + def getState(): StreamingContextState = { + ssc.getState() + } + + /** * Start the execution of the streams. */ def start(): Unit = { diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index b1adf881dd..2e00b980b9 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -72,6 +72,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @SuppressWarnings("unchecked") @Test + public void testContextState() { + List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4)); + Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaTestUtils.attachTestOutputStream(stream); + Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED); + ssc.start(); + Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE); + ssc.stop(); + Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED); + } + + @SuppressWarnings("unchecked") + @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1,2,3,4), diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index c0ea0491c3..bb80bff6dc 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase { ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { implicit val cm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + ssc.getState() val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[V]]() res.map(entry => out.append(new ArrayList[V](entry))) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 11c7fd835b..b8247db7e8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -109,15 +109,21 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) } + test("state matching") { + import StreamingContextState._ + assert(INITIALIZED === INITIALIZED) + assert(INITIALIZED != ACTIVE) + } + test("start and stop state check") { ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register() - assert(ssc.state === ssc.StreamingContextState.Initialized) + assert(ssc.getState() === StreamingContextState.INITIALIZED) ssc.start() - assert(ssc.state === ssc.StreamingContextState.Started) + assert(ssc.getState() === StreamingContextState.ACTIVE) ssc.stop() - assert(ssc.state === ssc.StreamingContextState.Stopped) + assert(ssc.getState() === StreamingContextState.STOPPED) // Make sure that the SparkContext is also stopped by default intercept[Exception] { @@ -129,9 +135,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register() ssc.start() + assert(ssc.getState() === StreamingContextState.ACTIVE) intercept[SparkException] { ssc.start() } + assert(ssc.getState() === StreamingContextState.ACTIVE) } test("stop multiple times") { @@ -139,13 +147,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w addInputStream(ssc).register() ssc.start() ssc.stop() + assert(ssc.getState() === StreamingContextState.STOPPED) ssc.stop() + assert(ssc.getState() === StreamingContextState.STOPPED) } test("stop before start") { ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register() ssc.stop() // stop before start should not throw exception + assert(ssc.getState() === StreamingContextState.STOPPED) } test("start after stop") { @@ -156,6 +167,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w intercept[SparkException] { ssc.start() // start after stop should throw exception } + assert(ssc.getState() === StreamingContextState.STOPPED) } test("stop only streaming context") { @@ -167,6 +179,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w addInputStream(ssc).register() ssc.start() ssc.stop(stopSparkContext = false) + assert(ssc.getState() === StreamingContextState.STOPPED) assert(sc.makeRDD(1 to 100).collect().size === 100) sc.stop() |