diff options
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 61 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 152 |
2 files changed, 202 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 8461e90120..407cab45ed 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -637,8 +637,10 @@ class StreamingContext private[streaming] ( */ object StreamingContext extends Logging { + /** - * Lock that guards access to global variables that track active StreamingContext. + * Lock that guards activation of a StreamingContext as well as access to the singleton active + * StreamingContext in getActiveOrCreate(). */ private val ACTIVATION_LOCK = new Object() @@ -661,6 +663,18 @@ object StreamingContext extends Logging { } } + /** + * :: Experimental :: + * + * Get the currently active context, if there is one. Active means started but not stopped. + */ + @Experimental + def getActive(): Option[StreamingContext] = { + ACTIVATION_LOCK.synchronized { + Option(activeContext.get()) + } + } + @deprecated("Replaced by implicit functions in the DStream companion object. This is " + "kept here only for backward compatibility.", "1.3.0") def toPairDStreamFunctions[K, V](stream: DStream[(K, V)]) @@ -670,6 +684,48 @@ object StreamingContext extends Logging { } /** + * :: Experimental :: + * + * Either return the "active" StreamingContext (that is, started but not stopped), or create a + * new StreamingContext that is + * @param creatingFunc Function to create a new StreamingContext + */ + @Experimental + def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext = { + ACTIVATION_LOCK.synchronized { + getActive().getOrElse { creatingFunc() } + } + } + + /** + * :: Experimental :: + * + * Either get the currently active StreamingContext (that is, started but not stopped), + * OR recreate a StreamingContext from checkpoint data in the given path. If checkpoint data + * does not exist in the provided, then create a new StreamingContext by calling the provided + * `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new StreamingContext + * @param hadoopConf Optional Hadoop configuration if necessary for reading from the + * file system + * @param createOnError Optional, whether to create a new StreamingContext if there is an + * error in reading checkpoint data. By default, an exception will be + * thrown on error. + */ + @Experimental + def getActiveOrCreate( + checkpointPath: String, + creatingFunc: () => StreamingContext, + hadoopConf: Configuration = new Configuration(), + createOnError: Boolean = false + ): StreamingContext = { + ACTIVATION_LOCK.synchronized { + getActive().getOrElse { getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) } + } + } + + /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be * recreated from the checkpoint data. If the data does not exist, then the StreamingContext @@ -694,7 +750,6 @@ object StreamingContext extends Logging { checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) } - /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be @@ -761,7 +816,7 @@ object StreamingContext extends Logging { ): SparkContext = { val conf = SparkContext.updatedConf( new SparkConf(), master, appName, sparkHome, jars, environment) - createNewSparkContext(conf) + new SparkContext(conf) } private[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 47299513de..5d09b234f7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -41,6 +41,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val batchDuration = Milliseconds(500) val sparkHome = "someDir" val envPair = "key" -> "value" + val conf = new SparkConf().setMaster(master).setAppName(appName) var sc: SparkContext = null var ssc: StreamingContext = null @@ -390,23 +391,23 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(newContextCreated, "new context not created") } - val corrutedCheckpointPath = createCorruptedCheckpoint() + val corruptedCheckpointPath = createCorruptedCheckpoint() // getOrCreate should throw exception with fake checkpoint file and createOnError = false intercept[Exception] { - ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _) + ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _) } // getOrCreate should throw exception with fake checkpoint file intercept[Exception] { ssc = StreamingContext.getOrCreate( - corrutedCheckpointPath, creatingFunction _, createOnError = false) + corruptedCheckpointPath, creatingFunction _, createOnError = false) } // getOrCreate should create new context with fake checkpoint file and createOnError = true testGetOrCreate { ssc = StreamingContext.getOrCreate( - corrutedCheckpointPath, creatingFunction _, createOnError = true) + corruptedCheckpointPath, creatingFunction _, createOnError = true) assert(ssc != null, "no context created") assert(newContextCreated, "new context not created") } @@ -491,8 +492,145 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } + test("getActive and getActiveOrCreate") { + require(StreamingContext.getActive().isEmpty, "context exists from before") + sc = new SparkContext(conf) + + var newContextCreated = false + + def creatingFunc(): StreamingContext = { + newContextCreated = true + val newSsc = new StreamingContext(sc, batchDuration) + val input = addInputStream(newSsc) + input.foreachRDD { rdd => rdd.count } + newSsc + } + + def testGetActiveOrCreate(body: => Unit): Unit = { + newContextCreated = false + try { + body + } finally { + + if (ssc != null) { + ssc.stop(stopSparkContext = false) + } + ssc = null + } + } + + // getActiveOrCreate should create new context and getActive should return it only + // after starting the context + testGetActiveOrCreate { + ssc = StreamingContext.getActiveOrCreate(creatingFunc _) + assert(ssc != null, "no context created") + assert(newContextCreated === true, "new context not created") + assert(StreamingContext.getActive().isEmpty, + "new initialized context returned before starting") + ssc.start() + assert(StreamingContext.getActive() === Some(ssc), + "active context not returned") + assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc, + "active context not returned") + ssc.stop() + assert(StreamingContext.getActive().isEmpty, + "inactive context returned") + assert(StreamingContext.getActiveOrCreate(creatingFunc _) !== ssc, + "inactive context returned") + } + + // getActiveOrCreate and getActive should return independently created context after activating + testGetActiveOrCreate { + ssc = creatingFunc() // Create + assert(StreamingContext.getActive().isEmpty, + "new initialized context returned before starting") + ssc.start() + assert(StreamingContext.getActive() === Some(ssc), + "active context not returned") + assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc, + "active context not returned") + ssc.stop() + assert(StreamingContext.getActive().isEmpty, + "inactive context returned") + } + } + + test("getActiveOrCreate with checkpoint") { + // Function to create StreamingContext that has a config to identify it to be new context + var newContextCreated = false + def creatingFunction(): StreamingContext = { + newContextCreated = true + new StreamingContext(conf, batchDuration) + } + + // Call ssc.stop after a body of code + def testGetActiveOrCreate(body: => Unit): Unit = { + require(StreamingContext.getActive().isEmpty) // no active context + newContextCreated = false + try { + body + } finally { + if (ssc != null) { + ssc.stop() + } + ssc = null + } + } + + val emptyPath = Utils.createTempDir().getAbsolutePath() + val corruptedCheckpointPath = createCorruptedCheckpoint() + val checkpointPath = createValidCheckpoint() + + // getActiveOrCreate should return the current active context if there is one + testGetActiveOrCreate { + ssc = new StreamingContext( + conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"), batchDuration) + addInputStream(ssc).register() + ssc.start() + val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _) + assert(!newContextCreated, "new context created instead of returning") + assert(returnedSsc.eq(ssc), "returned context is not the activated context") + } + + // getActiveOrCreate should create new context with empty path + testGetActiveOrCreate { + ssc = StreamingContext.getActiveOrCreate(emptyPath, creatingFunction _) + assert(ssc != null, "no context created") + assert(newContextCreated, "new context not created") + } + + // getActiveOrCreate should throw exception with fake checkpoint file and createOnError = false + intercept[Exception] { + ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _) + } + + // getActiveOrCreate should throw exception with fake checkpoint file + intercept[Exception] { + ssc = StreamingContext.getActiveOrCreate( + corruptedCheckpointPath, creatingFunction _, createOnError = false) + } + + // getActiveOrCreate should create new context with fake + // checkpoint file and createOnError = true + testGetActiveOrCreate { + ssc = StreamingContext.getActiveOrCreate( + corruptedCheckpointPath, creatingFunction _, createOnError = true) + assert(ssc != null, "no context created") + assert(newContextCreated, "new context not created") + } + + // getActiveOrCreate should recover context with checkpoint path, and recover old configuration + testGetActiveOrCreate { + ssc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _) + assert(ssc != null, "no context created") + assert(!newContextCreated, "old context not recovered") + assert(ssc.conf.get("someKey") === "someValue") + } + } + test("multiple streaming contexts") { - sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName)) + sc = new SparkContext( + conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")) ssc = new StreamingContext(sc, Seconds(1)) val input = addInputStream(ssc) input.foreachRDD { rdd => rdd.count } @@ -522,9 +660,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w def createValidCheckpoint(): String = { val testDirectory = Utils.createTempDir().getAbsolutePath() val checkpointDirectory = Utils.createTempDir().getAbsolutePath() - val conf = new SparkConf().setMaster(master).setAppName(appName) - conf.set("someKey", "someValue") - ssc = new StreamingContext(conf, batchDuration) + ssc = new StreamingContext(conf.clone.set("someKey", "someValue"), batchDuration) ssc.checkpoint(checkpointDirectory) ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() } ssc.start() |