aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-11 18:53:50 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-11 18:53:50 -0700
commitf9c7580adadce75a94bd2854cf4f743d8cbd1d23 (patch)
tree495efb608b2ce4f1eb7e07416bdba9b10c92d30b /streaming
parent35fb42a0b01d3043b7d5e27256d1b45a08583aab (diff)
downloadspark-f9c7580adadce75a94bd2854cf4f743d8cbd1d23.tar.gz
spark-f9c7580adadce75a94bd2854cf4f743d8cbd1d23.tar.bz2
spark-f9c7580adadce75a94bd2854cf4f743d8cbd1d23.zip
[SPARK-7530] [STREAMING] Added StreamingContext.getState() to expose the current state of the context
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6058 from tdas/SPARK-7530 and squashes the following commits: 80ee0e6 [Tathagata Das] STARTED --> ACTIVE 3da6547 [Tathagata Das] Added synchronized dd88444 [Tathagata Das] Added more docs e1a8505 [Tathagata Das] Fixed comment length 89f9980 [Tathagata Das] Change to Java enum and added Java test 7c57351 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530 dd4e702 [Tathagata Das] Addressed comments. 3d56106 [Tathagata Das] Added Mima excludes 2b86ba1 [Tathagata Das] Added scala docs. 1722433 [Tathagata Das] Fixed style 976b094 [Tathagata Das] Added license 0585130 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7530 e0f0a05 [Tathagata Das] Added getState and exposed StreamingContextState
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala75
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java45
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala22
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java14
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala19
6 files changed, 143 insertions, 33 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 5abe136775..2c5834defa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -32,10 +32,11 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
@@ -195,14 +196,7 @@ class StreamingContext private[streaming] (
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
- /** Enumeration to identify current state of the StreamingContext */
- private[streaming] object StreamingContextState extends Enumeration {
- type CheckpointState = Value
- val Initialized, Started, Stopped = Value
- }
-
- import StreamingContextState._
- private[streaming] var state = Initialized
+ private var state: StreamingContextState = INITIALIZED
private val startSite = new AtomicReference[CallSite](null)
@@ -517,17 +511,34 @@ class StreamingContext private[streaming] (
}
/**
+ * :: DeveloperApi ::
+ *
+ * Return the current state of the context. The context can be in three possible states -
+ * - StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
+ * Input DStreams, transformations and output operations can be created on the context.
+ * - StreamingContextState.ACTIVE - The context has been started, and been not stopped.
+ * Input DStreams, transformations and output operations cannot be created on the context.
+ * - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
+ */
+ @DeveloperApi
+ def getState(): StreamingContextState = synchronized {
+ state
+ }
+
+ /**
* Start the execution of the streams.
*
* @throws SparkException if the context has already been started or stopped.
*/
def start(): Unit = synchronized {
import StreamingContext._
- if (state == Started) {
- throw new SparkException("StreamingContext has already been started")
- }
- if (state == Stopped) {
- throw new SparkException("StreamingContext has already been stopped")
+ state match {
+ case INITIALIZED =>
+ // good to start
+ case ACTIVE =>
+ throw new SparkException("StreamingContext has already been started")
+ case STOPPED =>
+ throw new SparkException("StreamingContext has already been stopped")
}
validate()
startSite.set(DStream.getCreationSite())
@@ -536,7 +547,7 @@ class StreamingContext private[streaming] (
assertNoOtherContextIsActive()
scheduler.start()
uiTab.foreach(_.attach())
- state = Started
+ state = StreamingContextState.ACTIVE
setActiveContext(this)
}
}
@@ -598,22 +609,26 @@ class StreamingContext private[streaming] (
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
- state match {
- case Initialized => logWarning("StreamingContext has not been started yet")
- case Stopped => logWarning("StreamingContext has already been stopped")
- case Started =>
- scheduler.stop(stopGracefully)
- logInfo("StreamingContext stopped successfully")
- waiter.notifyStop()
+ try {
+ state match {
+ case INITIALIZED =>
+ logWarning("StreamingContext has not been started yet")
+ case STOPPED =>
+ logWarning("StreamingContext has already been stopped")
+ case ACTIVE =>
+ scheduler.stop(stopGracefully)
+ uiTab.foreach(_.detach())
+ StreamingContext.setActiveContext(null)
+ waiter.notifyStop()
+ logInfo("StreamingContext stopped successfully")
+ }
+ // Even if we have already stopped, we still need to attempt to stop the SparkContext because
+ // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
+ if (stopSparkContext) sc.stop()
+ } finally {
+ // The state should always be Stopped after calling `stop()`, even if we haven't started yet
+ state = STOPPED
}
- // Even if the streaming context has not been started, we still need to stop the SparkContext.
- // Even if we have already stopped, we still need to attempt to stop the SparkContext because
- // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
- if (stopSparkContext) sc.stop()
- uiTab.foreach(_.detach())
- // The state should always be Stopped after calling `stop()`, even if we haven't started yet:
- state = Stopped
- StreamingContext.setActiveContext(null)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java
new file mode 100644
index 0000000000..d7b639383e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContextState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Represents the state of a StreamingContext.
+ */
+@DeveloperApi
+public enum StreamingContextState {
+ /**
+ * The context has been created, but not been started yet.
+ * Input DStreams, transformations and output operations can be created on the context.
+ */
+ INITIALIZED,
+
+ /**
+ * The context has been started, and been not stopped.
+ * Input DStreams, transformations and output operations cannot be created on the context.
+ */
+ ACTIVE,
+
+ /**
+ * The context has been stopped and cannot be used any more.
+ */
+ STOPPED
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 572d7d8e87..d8fbed2c50 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -579,6 +579,28 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
}
/**
+ * :: DeveloperApi ::
+ *
+ * Return the current state of the context. The context can be in three possible states -
+ * <ul>
+ * <li>
+ * StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
+ * Input DStreams, transformations and output operations can be created on the context.
+ * </li>
+ * <li>
+ * StreamingContextState.ACTIVE - The context has been started, and been not stopped.
+ * Input DStreams, transformations and output operations cannot be created on the context.
+ * </li>
+ * <li>
+ * StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
+ * </li>
+ * </ul>
+ */
+ def getState(): StreamingContextState = {
+ ssc.getState()
+ }
+
+ /**
* Start the execution of the streams.
*/
def start(): Unit = {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index b1adf881dd..2e00b980b9 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -72,6 +72,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
+ public void testContextState() {
+ List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
+ Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaTestUtils.attachTestOutputStream(stream);
+ Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+ ssc.start();
+ Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE);
+ ssc.stop();
+ Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3,4),
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index c0ea0491c3..bb80bff6dc 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -70,6 +70,7 @@ trait JavaTestBase extends TestSuiteBase {
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
implicit val cm: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ ssc.getState()
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 11c7fd835b..b8247db7e8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -109,15 +109,21 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
}
+ test("state matching") {
+ import StreamingContextState._
+ assert(INITIALIZED === INITIALIZED)
+ assert(INITIALIZED != ACTIVE)
+ }
+
test("start and stop state check") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
- assert(ssc.state === ssc.StreamingContextState.Initialized)
+ assert(ssc.getState() === StreamingContextState.INITIALIZED)
ssc.start()
- assert(ssc.state === ssc.StreamingContextState.Started)
+ assert(ssc.getState() === StreamingContextState.ACTIVE)
ssc.stop()
- assert(ssc.state === ssc.StreamingContextState.Stopped)
+ assert(ssc.getState() === StreamingContextState.STOPPED)
// Make sure that the SparkContext is also stopped by default
intercept[Exception] {
@@ -129,9 +135,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.start()
+ assert(ssc.getState() === StreamingContextState.ACTIVE)
intercept[SparkException] {
ssc.start()
}
+ assert(ssc.getState() === StreamingContextState.ACTIVE)
}
test("stop multiple times") {
@@ -139,13 +147,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
addInputStream(ssc).register()
ssc.start()
ssc.stop()
+ assert(ssc.getState() === StreamingContextState.STOPPED)
ssc.stop()
+ assert(ssc.getState() === StreamingContextState.STOPPED)
}
test("stop before start") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.stop() // stop before start should not throw exception
+ assert(ssc.getState() === StreamingContextState.STOPPED)
}
test("start after stop") {
@@ -156,6 +167,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
intercept[SparkException] {
ssc.start() // start after stop should throw exception
}
+ assert(ssc.getState() === StreamingContextState.STOPPED)
}
test("stop only streaming context") {
@@ -167,6 +179,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
addInputStream(ssc).register()
ssc.start()
ssc.stop(stopSparkContext = false)
+ assert(ssc.getState() === StreamingContextState.STOPPED)
assert(sc.makeRDD(1 to 100).collect().size === 100)
sc.stop()