aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-13 17:33:15 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-13 17:33:15 -0700
commitbce00dac403d3be2be59218b7b93a56c34c68f1a (patch)
treeed1f903fa2d687e522a17163dd1e8ebdf9b8f013 /streaming/src/test
parent59aaa1dad6bee06e38ee5c03bdf82354242286ee (diff)
downloadspark-bce00dac403d3be2be59218b7b93a56c34c68f1a.tar.gz
spark-bce00dac403d3be2be59218b7b93a56c34c68f1a.tar.bz2
spark-bce00dac403d3be2be59218b7b93a56c34c68f1a.zip
[SPARK-6752] [STREAMING] [REVISED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
This is a revision of the earlier version (see #5773) that passed the active SparkContext explicitly through a new set of Java and Scala API. The drawbacks are. * Hard to implement in python. * New API introduced. This is even more confusing since we are introducing getActiveOrCreate in SPARK-7553 Furthermore, there is now a direct way get an existing active SparkContext or create a new on - SparkContext.getOrCreate(conf). Its better to use this to get the SparkContext rather than have a new API to explicitly pass the context. So in this PR I have * Removed the new versions of StreamingContext.getOrCreate() which took SparkContext * Added the ability to pick up existing SparkContext when the StreamingContext tries to create a SparkContext. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6096 from tdas/SPARK-6752 and squashes the following commits: 53f4b2d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 f024b77 [Tathagata Das] Removed extra API and used SparkContext.getOrCreate
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java25
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala70
2 files changed, 8 insertions, 87 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2e00b980b9..1077b1b2cb 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1766,29 +1766,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
- // Function to create JavaStreamingContext using existing JavaSparkContext
- // without any output operations (used to detect the new context)
- Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
- new Function<JavaSparkContext, JavaStreamingContext>() {
- public JavaStreamingContext call(JavaSparkContext context) {
- newContextCreated.set(true);
- return new JavaStreamingContext(context, Seconds.apply(1));
- }
- };
-
- JavaSparkContext sc = new JavaSparkContext(conf);
- newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
- Assert.assertTrue("new context not created", newContextCreated.get());
- ssc.stop(false);
-
newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
- Assert.assertTrue("new context not created", newContextCreated.get());
- ssc.stop(false);
-
- newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+ new org.apache.hadoop.conf.Configuration());
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5f93332896..4b12affbb0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -419,76 +419,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
- assert(ssc.conf.get("someKey") === "someValue")
- }
- }
-
- test("getOrCreate with existing SparkContext") {
- val conf = new SparkConf().setMaster(master).setAppName(appName)
- sc = new SparkContext(conf)
-
- // Function to create StreamingContext that has a config to identify it to be new context
- var newContextCreated = false
- def creatingFunction(sparkContext: SparkContext): StreamingContext = {
- newContextCreated = true
- new StreamingContext(sparkContext, batchDuration)
- }
-
- // Call ssc.stop(stopSparkContext = false) after a body of cody
- def testGetOrCreate(body: => Unit): Unit = {
- newContextCreated = false
- try {
- body
- } finally {
- if (ssc != null) {
- ssc.stop(stopSparkContext = false)
- }
- ssc = null
- }
- }
-
- val emptyPath = Utils.createTempDir().getAbsolutePath()
-
- // getOrCreate should create new context with empty path
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
+ assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
}
- val corrutedCheckpointPath = createCorruptedCheckpoint()
-
- // getOrCreate should throw exception with fake checkpoint file and createOnError = false
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
- }
-
- // getOrCreate should throw exception with fake checkpoint file
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
- }
-
- // getOrCreate should create new context with fake checkpoint file and createOnError = true
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
- }
-
- val checkpointPath = createValidCheckpoint()
-
- // StreamingContext.getOrCreate should recover context with checkpoint path
+ // getOrCreate should recover StreamingContext with existing SparkContext
testGetOrCreate {
- ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
+ sc = new SparkContext(conf)
+ ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
- assert(!ssc.conf.contains("someKey"),
- "recovered StreamingContext unexpectedly has old config")
+ assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
}
}