aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala61
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala152
2 files changed, 202 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8461e90120..407cab45ed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -637,8 +637,10 @@ class StreamingContext private[streaming] (
*/
object StreamingContext extends Logging {
+
/**
- * Lock that guards access to global variables that track active StreamingContext.
+ * Lock that guards activation of a StreamingContext as well as access to the singleton active
+ * StreamingContext in getActiveOrCreate().
*/
private val ACTIVATION_LOCK = new Object()
@@ -661,6 +663,18 @@ object StreamingContext extends Logging {
}
}
+ /**
+ * :: Experimental ::
+ *
+ * Get the currently active context, if there is one. Active means started but not stopped.
+ */
+ @Experimental
+ def getActive(): Option[StreamingContext] = {
+ ACTIVATION_LOCK.synchronized {
+ Option(activeContext.get())
+ }
+ }
+
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
@@ -670,6 +684,48 @@ object StreamingContext extends Logging {
}
/**
+ * :: Experimental ::
+ *
+ * Either return the "active" StreamingContext (that is, started but not stopped), or create a
+ * new StreamingContext that is
+ * @param creatingFunc Function to create a new StreamingContext
+ */
+ @Experimental
+ def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext = {
+ ACTIVATION_LOCK.synchronized {
+ getActive().getOrElse { creatingFunc() }
+ }
+ }
+
+ /**
+ * :: Experimental ::
+ *
+ * Either get the currently active StreamingContext (that is, started but not stopped),
+ * OR recreate a StreamingContext from checkpoint data in the given path. If checkpoint data
+ * does not exist in the provided, then create a new StreamingContext by calling the provided
+ * `creatingFunc`.
+ *
+ * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
+ * @param creatingFunc Function to create a new StreamingContext
+ * @param hadoopConf Optional Hadoop configuration if necessary for reading from the
+ * file system
+ * @param createOnError Optional, whether to create a new StreamingContext if there is an
+ * error in reading checkpoint data. By default, an exception will be
+ * thrown on error.
+ */
+ @Experimental
+ def getActiveOrCreate(
+ checkpointPath: String,
+ creatingFunc: () => StreamingContext,
+ hadoopConf: Configuration = new Configuration(),
+ createOnError: Boolean = false
+ ): StreamingContext = {
+ ACTIVATION_LOCK.synchronized {
+ getActive().getOrElse { getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) }
+ }
+ }
+
+ /**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
@@ -694,7 +750,6 @@ object StreamingContext extends Logging {
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}
-
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
@@ -761,7 +816,7 @@ object StreamingContext extends Logging {
): SparkContext = {
val conf = SparkContext.updatedConf(
new SparkConf(), master, appName, sparkHome, jars, environment)
- createNewSparkContext(conf)
+ new SparkContext(conf)
}
private[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 47299513de..5d09b234f7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -41,6 +41,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val batchDuration = Milliseconds(500)
val sparkHome = "someDir"
val envPair = "key" -> "value"
+ val conf = new SparkConf().setMaster(master).setAppName(appName)
var sc: SparkContext = null
var ssc: StreamingContext = null
@@ -390,23 +391,23 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(newContextCreated, "new context not created")
}
- val corrutedCheckpointPath = createCorruptedCheckpoint()
+ val corruptedCheckpointPath = createCorruptedCheckpoint()
// getOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
- ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _)
+ ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
}
// getOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, createOnError = false)
+ corruptedCheckpointPath, creatingFunction _, createOnError = false)
}
// getOrCreate should create new context with fake checkpoint file and createOnError = true
testGetOrCreate {
ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, createOnError = true)
+ corruptedCheckpointPath, creatingFunction _, createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
@@ -491,8 +492,145 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
+ test("getActive and getActiveOrCreate") {
+ require(StreamingContext.getActive().isEmpty, "context exists from before")
+ sc = new SparkContext(conf)
+
+ var newContextCreated = false
+
+ def creatingFunc(): StreamingContext = {
+ newContextCreated = true
+ val newSsc = new StreamingContext(sc, batchDuration)
+ val input = addInputStream(newSsc)
+ input.foreachRDD { rdd => rdd.count }
+ newSsc
+ }
+
+ def testGetActiveOrCreate(body: => Unit): Unit = {
+ newContextCreated = false
+ try {
+ body
+ } finally {
+
+ if (ssc != null) {
+ ssc.stop(stopSparkContext = false)
+ }
+ ssc = null
+ }
+ }
+
+ // getActiveOrCreate should create new context and getActive should return it only
+ // after starting the context
+ testGetActiveOrCreate {
+ ssc = StreamingContext.getActiveOrCreate(creatingFunc _)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated === true, "new context not created")
+ assert(StreamingContext.getActive().isEmpty,
+ "new initialized context returned before starting")
+ ssc.start()
+ assert(StreamingContext.getActive() === Some(ssc),
+ "active context not returned")
+ assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc,
+ "active context not returned")
+ ssc.stop()
+ assert(StreamingContext.getActive().isEmpty,
+ "inactive context returned")
+ assert(StreamingContext.getActiveOrCreate(creatingFunc _) !== ssc,
+ "inactive context returned")
+ }
+
+ // getActiveOrCreate and getActive should return independently created context after activating
+ testGetActiveOrCreate {
+ ssc = creatingFunc() // Create
+ assert(StreamingContext.getActive().isEmpty,
+ "new initialized context returned before starting")
+ ssc.start()
+ assert(StreamingContext.getActive() === Some(ssc),
+ "active context not returned")
+ assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc,
+ "active context not returned")
+ ssc.stop()
+ assert(StreamingContext.getActive().isEmpty,
+ "inactive context returned")
+ }
+ }
+
+ test("getActiveOrCreate with checkpoint") {
+ // Function to create StreamingContext that has a config to identify it to be new context
+ var newContextCreated = false
+ def creatingFunction(): StreamingContext = {
+ newContextCreated = true
+ new StreamingContext(conf, batchDuration)
+ }
+
+ // Call ssc.stop after a body of code
+ def testGetActiveOrCreate(body: => Unit): Unit = {
+ require(StreamingContext.getActive().isEmpty) // no active context
+ newContextCreated = false
+ try {
+ body
+ } finally {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ ssc = null
+ }
+ }
+
+ val emptyPath = Utils.createTempDir().getAbsolutePath()
+ val corruptedCheckpointPath = createCorruptedCheckpoint()
+ val checkpointPath = createValidCheckpoint()
+
+ // getActiveOrCreate should return the current active context if there is one
+ testGetActiveOrCreate {
+ ssc = new StreamingContext(
+ conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"), batchDuration)
+ addInputStream(ssc).register()
+ ssc.start()
+ val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+ assert(!newContextCreated, "new context created instead of returning")
+ assert(returnedSsc.eq(ssc), "returned context is not the activated context")
+ }
+
+ // getActiveOrCreate should create new context with empty path
+ testGetActiveOrCreate {
+ ssc = StreamingContext.getActiveOrCreate(emptyPath, creatingFunction _)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated, "new context not created")
+ }
+
+ // getActiveOrCreate should throw exception with fake checkpoint file and createOnError = false
+ intercept[Exception] {
+ ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
+ }
+
+ // getActiveOrCreate should throw exception with fake checkpoint file
+ intercept[Exception] {
+ ssc = StreamingContext.getActiveOrCreate(
+ corruptedCheckpointPath, creatingFunction _, createOnError = false)
+ }
+
+ // getActiveOrCreate should create new context with fake
+ // checkpoint file and createOnError = true
+ testGetActiveOrCreate {
+ ssc = StreamingContext.getActiveOrCreate(
+ corruptedCheckpointPath, creatingFunction _, createOnError = true)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated, "new context not created")
+ }
+
+ // getActiveOrCreate should recover context with checkpoint path, and recover old configuration
+ testGetActiveOrCreate {
+ ssc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+ assert(ssc != null, "no context created")
+ assert(!newContextCreated, "old context not recovered")
+ assert(ssc.conf.get("someKey") === "someValue")
+ }
+ }
+
test("multiple streaming contexts") {
- sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName))
+ sc = new SparkContext(
+ conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"))
ssc = new StreamingContext(sc, Seconds(1))
val input = addInputStream(ssc)
input.foreachRDD { rdd => rdd.count }
@@ -522,9 +660,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
def createValidCheckpoint(): String = {
val testDirectory = Utils.createTempDir().getAbsolutePath()
val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
- val conf = new SparkConf().setMaster(master).setAppName(appName)
- conf.set("someKey", "someValue")
- ssc = new StreamingContext(conf, batchDuration)
+ ssc = new StreamingContext(conf.clone.set("someKey", "someValue"), batchDuration)
ssc.checkpoint(checkpointDirectory)
ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() }
ssc.start()