aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-13 17:33:15 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-13 17:33:15 -0700
commitbce00dac403d3be2be59218b7b93a56c34c68f1a (patch)
treeed1f903fa2d687e522a17163dd1e8ebdf9b8f013 /streaming
parent59aaa1dad6bee06e38ee5c03bdf82354242286ee (diff)
downloadspark-bce00dac403d3be2be59218b7b93a56c34c68f1a.tar.gz
spark-bce00dac403d3be2be59218b7b93a56c34c68f1a.tar.bz2
spark-bce00dac403d3be2be59218b7b93a56c34c68f1a.zip
[SPARK-6752] [STREAMING] [REVISED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
This is a revision of the earlier version (see #5773) that passed the active SparkContext explicitly through a new set of Java and Scala API. The drawbacks are. * Hard to implement in python. * New API introduced. This is even more confusing since we are introducing getActiveOrCreate in SPARK-7553 Furthermore, there is now a direct way get an existing active SparkContext or create a new on - SparkContext.getOrCreate(conf). Its better to use this to get the SparkContext rather than have a new API to explicitly pass the context. So in this PR I have * Removed the new versions of StreamingContext.getOrCreate() which took SparkContext * Added the ability to pick up existing SparkContext when the StreamingContext tries to create a SparkContext. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6096 from tdas/SPARK-6752 and squashes the following commits: 53f4b2d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 f024b77 [Tathagata Das] Removed extra API and used SparkContext.getOrCreate
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala49
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala45
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java25
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala70
4 files changed, 9 insertions, 180 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 407cab45ed..1d2ecdd341 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -134,7 +134,7 @@ class StreamingContext private[streaming] (
if (sc_ != null) {
sc_
} else if (isCheckpointPresent) {
- new SparkContext(cp_.createSparkConf())
+ SparkContext.getOrCreate(cp_.createSparkConf())
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
@@ -751,53 +751,6 @@ object StreamingContext extends Logging {
}
/**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
- * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
- * that the SparkConf configuration in the checkpoint data will not be restored as the
- * SparkContext has already been created.
- *
- * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
- * @param creatingFunc Function to create a new StreamingContext using the given SparkContext
- * @param sparkContext SparkContext using which the StreamingContext will be created
- */
- def getOrCreate(
- checkpointPath: String,
- creatingFunc: SparkContext => StreamingContext,
- sparkContext: SparkContext
- ): StreamingContext = {
- getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
- }
-
- /**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
- * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
- * that the SparkConf configuration in the checkpoint data will not be restored as the
- * SparkContext has already been created.
- *
- * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
- * @param creatingFunc Function to create a new StreamingContext using the given SparkContext
- * @param sparkContext SparkContext using which the StreamingContext will be created
- * @param createOnError Whether to create a new StreamingContext if there is an
- * error in reading checkpoint data. By default, an exception will be
- * thrown on error.
- */
- def getOrCreate(
- checkpointPath: String,
- creatingFunc: SparkContext => StreamingContext,
- sparkContext: SparkContext,
- createOnError: Boolean
- ): StreamingContext = {
- val checkpointOption = CheckpointReader.read(
- checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
- checkpointOption.map(new StreamingContext(sparkContext, _, null))
- .getOrElse(creatingFunc(sparkContext))
- }
-
- /**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d8fbed2c50..b639b94d5c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -805,51 +805,6 @@ object JavaStreamingContext {
}
/**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the provided factory
- * will be used to create a JavaStreamingContext.
- *
- * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
- * @param creatingFunc Function to create a new JavaStreamingContext
- * @param sparkContext SparkContext using which the StreamingContext will be created
- */
- def getOrCreate(
- checkpointPath: String,
- creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
- sparkContext: JavaSparkContext
- ): JavaStreamingContext = {
- val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
- creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
- }, sparkContext.sc)
- new JavaStreamingContext(ssc)
- }
-
- /**
- * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
- * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the provided factory
- * will be used to create a JavaStreamingContext.
- *
- * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
- * @param creatingFunc Function to create a new JavaStreamingContext
- * @param sparkContext SparkContext using which the StreamingContext will be created
- * @param createOnError Whether to create a new JavaStreamingContext if there is an
- * error in reading checkpoint data.
- */
- def getOrCreate(
- checkpointPath: String,
- creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
- sparkContext: JavaSparkContext,
- createOnError: Boolean
- ): JavaStreamingContext = {
- val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
- creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
- }, sparkContext.sc, createOnError)
- new JavaStreamingContext(ssc)
- }
-
- /**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
*/
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2e00b980b9..1077b1b2cb 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1766,29 +1766,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
- // Function to create JavaStreamingContext using existing JavaSparkContext
- // without any output operations (used to detect the new context)
- Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
- new Function<JavaSparkContext, JavaStreamingContext>() {
- public JavaStreamingContext call(JavaSparkContext context) {
- newContextCreated.set(true);
- return new JavaStreamingContext(context, Seconds.apply(1));
- }
- };
-
- JavaSparkContext sc = new JavaSparkContext(conf);
- newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
- Assert.assertTrue("new context not created", newContextCreated.get());
- ssc.stop(false);
-
newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
- Assert.assertTrue("new context not created", newContextCreated.get());
- ssc.stop(false);
-
- newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+ new org.apache.hadoop.conf.Configuration());
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5f93332896..4b12affbb0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -419,76 +419,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
- assert(ssc.conf.get("someKey") === "someValue")
- }
- }
-
- test("getOrCreate with existing SparkContext") {
- val conf = new SparkConf().setMaster(master).setAppName(appName)
- sc = new SparkContext(conf)
-
- // Function to create StreamingContext that has a config to identify it to be new context
- var newContextCreated = false
- def creatingFunction(sparkContext: SparkContext): StreamingContext = {
- newContextCreated = true
- new StreamingContext(sparkContext, batchDuration)
- }
-
- // Call ssc.stop(stopSparkContext = false) after a body of cody
- def testGetOrCreate(body: => Unit): Unit = {
- newContextCreated = false
- try {
- body
- } finally {
- if (ssc != null) {
- ssc.stop(stopSparkContext = false)
- }
- ssc = null
- }
- }
-
- val emptyPath = Utils.createTempDir().getAbsolutePath()
-
- // getOrCreate should create new context with empty path
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
+ assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
}
- val corrutedCheckpointPath = createCorruptedCheckpoint()
-
- // getOrCreate should throw exception with fake checkpoint file and createOnError = false
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
- }
-
- // getOrCreate should throw exception with fake checkpoint file
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
- }
-
- // getOrCreate should create new context with fake checkpoint file and createOnError = true
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
- }
-
- val checkpointPath = createValidCheckpoint()
-
- // StreamingContext.getOrCreate should recover context with checkpoint path
+ // getOrCreate should recover StreamingContext with existing SparkContext
testGetOrCreate {
- ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
+ sc = new SparkContext(conf)
+ ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
- assert(!ssc.conf.contains("someKey"),
- "recovered StreamingContext unexpectedly has old config")
+ assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
}
}