aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-12 16:44:14 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-12 16:44:14 -0700
commit00e7b09a0bee2fcfd0ce34992bd26435758daf26 (patch)
tree1faf35f02885eb775eb75999e524c4ece7be98d4 /streaming
parent96c4846db89802f5a81dca5dcfa3f2a0f72b5cb8 (diff)
downloadspark-00e7b09a0bee2fcfd0ce34992bd26435758daf26.tar.gz
spark-00e7b09a0bee2fcfd0ce34992bd26435758daf26.tar.bz2
spark-00e7b09a0bee2fcfd0ce34992bd26435758daf26.zip
[SPARK-7553] [STREAMING] Added methods to maintain a singleton StreamingContext
In a REPL/notebook environment, its very easy to lose a reference to a StreamingContext by overriding the variable name. So if you happen to execute the following commands ``` val ssc = new StreamingContext(...) // cmd 1 ssc.start() // cmd 2 ... val ssc = new StreamingContext(...) // accidentally run cmd 1 again ``` The value of ssc will be overwritten. Now you can neither start the new context (as only one context can be started), nor stop the previous context (as the reference is lost). Hence its best to maintain a singleton reference to the active context, so that we never loose reference for the active context. Since this problem occurs useful in REPL environments, its best to add this as an Experimental support in the Scala API only so that it can be used in Scala REPLs and notebooks. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6070 from tdas/SPARK-7553 and squashes the following commits: 731c9a1 [Tathagata Das] Fixed style a797171 [Tathagata Das] Added more unit tests 19fc70b [Tathagata Das] Added :: Experimental :: in docs 64706c9 [Tathagata Das] Fixed test 634db5d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7553 3884a25 [Tathagata Das] Fixing test bug d37a846 [Tathagata Das] Added getActive and getActiveOrCreate
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala61
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala152
2 files changed, 202 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8461e90120..407cab45ed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -637,8 +637,10 @@ class StreamingContext private[streaming] (
*/
object StreamingContext extends Logging {
+
/**
- * Lock that guards access to global variables that track active StreamingContext.
+ * Lock that guards activation of a StreamingContext as well as access to the singleton active
+ * StreamingContext in getActiveOrCreate().
*/
private val ACTIVATION_LOCK = new Object()
@@ -661,6 +663,18 @@ object StreamingContext extends Logging {
}
}
+ /**
+ * :: Experimental ::
+ *
+ * Get the currently active context, if there is one. Active means started but not stopped.
+ */
+ @Experimental
+ def getActive(): Option[StreamingContext] = {
+ ACTIVATION_LOCK.synchronized {
+ Option(activeContext.get())
+ }
+ }
+
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
@@ -670,6 +684,48 @@ object StreamingContext extends Logging {
}
/**
+ * :: Experimental ::
+ *
+ * Either return the "active" StreamingContext (that is, started but not stopped), or create a
+ * new StreamingContext that is
+ * @param creatingFunc Function to create a new StreamingContext
+ */
+ @Experimental
+ def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext = {
+ ACTIVATION_LOCK.synchronized {
+ getActive().getOrElse { creatingFunc() }
+ }
+ }
+
+ /**
+ * :: Experimental ::
+ *
+ * Either get the currently active StreamingContext (that is, started but not stopped),
+ * OR recreate a StreamingContext from checkpoint data in the given path. If checkpoint data
+ * does not exist in the provided, then create a new StreamingContext by calling the provided
+ * `creatingFunc`.
+ *
+ * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
+ * @param creatingFunc Function to create a new StreamingContext
+ * @param hadoopConf Optional Hadoop configuration if necessary for reading from the
+ * file system
+ * @param createOnError Optional, whether to create a new StreamingContext if there is an
+ * error in reading checkpoint data. By default, an exception will be
+ * thrown on error.
+ */
+ @Experimental
+ def getActiveOrCreate(
+ checkpointPath: String,
+ creatingFunc: () => StreamingContext,
+ hadoopConf: Configuration = new Configuration(),
+ createOnError: Boolean = false
+ ): StreamingContext = {
+ ACTIVATION_LOCK.synchronized {
+ getActive().getOrElse { getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) }
+ }
+ }
+
+ /**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
@@ -694,7 +750,6 @@ object StreamingContext extends Logging {
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}
-
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
@@ -761,7 +816,7 @@ object StreamingContext extends Logging {
): SparkContext = {
val conf = SparkContext.updatedConf(
new SparkConf(), master, appName, sparkHome, jars, environment)
- createNewSparkContext(conf)
+ new SparkContext(conf)
}
private[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 47299513de..5d09b234f7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -41,6 +41,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val batchDuration = Milliseconds(500)
val sparkHome = "someDir"
val envPair = "key" -> "value"
+ val conf = new SparkConf().setMaster(master).setAppName(appName)
var sc: SparkContext = null
var ssc: StreamingContext = null
@@ -390,23 +391,23 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(newContextCreated, "new context not created")
}
- val corrutedCheckpointPath = createCorruptedCheckpoint()
+ val corruptedCheckpointPath = createCorruptedCheckpoint()
// getOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
- ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _)
+ ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
}
// getOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, createOnError = false)
+ corruptedCheckpointPath, creatingFunction _, createOnError = false)
}
// getOrCreate should create new context with fake checkpoint file and createOnError = true
testGetOrCreate {
ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, createOnError = true)
+ corruptedCheckpointPath, creatingFunction _, createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
@@ -491,8 +492,145 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
+ test("getActive and getActiveOrCreate") {
+ require(StreamingContext.getActive().isEmpty, "context exists from before")
+ sc = new SparkContext(conf)
+
+ var newContextCreated = false
+
+ def creatingFunc(): StreamingContext = {
+ newContextCreated = true
+ val newSsc = new StreamingContext(sc, batchDuration)
+ val input = addInputStream(newSsc)
+ input.foreachRDD { rdd => rdd.count }
+ newSsc
+ }
+
+ def testGetActiveOrCreate(body: => Unit): Unit = {
+ newContextCreated = false
+ try {
+ body
+ } finally {
+
+ if (ssc != null) {
+ ssc.stop(stopSparkContext = false)
+ }
+ ssc = null
+ }
+ }
+
+ // getActiveOrCreate should create new context and getActive should return it only
+ // after starting the context
+ testGetActiveOrCreate {
+ ssc = StreamingContext.getActiveOrCreate(creatingFunc _)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated === true, "new context not created")
+ assert(StreamingContext.getActive().isEmpty,
+ "new initialized context returned before starting")
+ ssc.start()
+ assert(StreamingContext.getActive() === Some(ssc),
+ "active context not returned")
+ assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc,
+ "active context not returned")
+ ssc.stop()
+ assert(StreamingContext.getActive().isEmpty,
+ "inactive context returned")
+ assert(StreamingContext.getActiveOrCreate(creatingFunc _) !== ssc,
+ "inactive context returned")
+ }
+
+ // getActiveOrCreate and getActive should return independently created context after activating
+ testGetActiveOrCreate {
+ ssc = creatingFunc() // Create
+ assert(StreamingContext.getActive().isEmpty,
+ "new initialized context returned before starting")
+ ssc.start()
+ assert(StreamingContext.getActive() === Some(ssc),
+ "active context not returned")
+ assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc,
+ "active context not returned")
+ ssc.stop()
+ assert(StreamingContext.getActive().isEmpty,
+ "inactive context returned")
+ }
+ }
+
+ test("getActiveOrCreate with checkpoint") {
+ // Function to create StreamingContext that has a config to identify it to be new context
+ var newContextCreated = false
+ def creatingFunction(): StreamingContext = {
+ newContextCreated = true
+ new StreamingContext(conf, batchDuration)
+ }
+
+ // Call ssc.stop after a body of code
+ def testGetActiveOrCreate(body: => Unit): Unit = {
+ require(StreamingContext.getActive().isEmpty) // no active context
+ newContextCreated = false
+ try {
+ body
+ } finally {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ ssc = null
+ }
+ }
+
+ val emptyPath = Utils.createTempDir().getAbsolutePath()
+ val corruptedCheckpointPath = createCorruptedCheckpoint()
+ val checkpointPath = createValidCheckpoint()
+
+ // getActiveOrCreate should return the current active context if there is one
+ testGetActiveOrCreate {
+ ssc = new StreamingContext(
+ conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"), batchDuration)
+ addInputStream(ssc).register()
+ ssc.start()
+ val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+ assert(!newContextCreated, "new context created instead of returning")
+ assert(returnedSsc.eq(ssc), "returned context is not the activated context")
+ }
+
+ // getActiveOrCreate should create new context with empty path
+ testGetActiveOrCreate {
+ ssc = StreamingContext.getActiveOrCreate(emptyPath, creatingFunction _)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated, "new context not created")
+ }
+
+ // getActiveOrCreate should throw exception with fake checkpoint file and createOnError = false
+ intercept[Exception] {
+ ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
+ }
+
+ // getActiveOrCreate should throw exception with fake checkpoint file
+ intercept[Exception] {
+ ssc = StreamingContext.getActiveOrCreate(
+ corruptedCheckpointPath, creatingFunction _, createOnError = false)
+ }
+
+ // getActiveOrCreate should create new context with fake
+ // checkpoint file and createOnError = true
+ testGetActiveOrCreate {
+ ssc = StreamingContext.getActiveOrCreate(
+ corruptedCheckpointPath, creatingFunction _, createOnError = true)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated, "new context not created")
+ }
+
+ // getActiveOrCreate should recover context with checkpoint path, and recover old configuration
+ testGetActiveOrCreate {
+ ssc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+ assert(ssc != null, "no context created")
+ assert(!newContextCreated, "old context not recovered")
+ assert(ssc.conf.get("someKey") === "someValue")
+ }
+ }
+
test("multiple streaming contexts") {
- sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName))
+ sc = new SparkContext(
+ conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"))
ssc = new StreamingContext(sc, Seconds(1))
val input = addInputStream(ssc)
input.foreachRDD { rdd => rdd.count }
@@ -522,9 +660,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
def createValidCheckpoint(): String = {
val testDirectory = Utils.createTempDir().getAbsolutePath()
val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
- val conf = new SparkConf().setMaster(master).setAppName(appName)
- conf.set("someKey", "someValue")
- ssc = new StreamingContext(conf, batchDuration)
+ ssc = new StreamingContext(conf.clone.set("someKey", "someValue"), batchDuration)
ssc.checkpoint(checkpointDirectory)
ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() }
ssc.start()