aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-13 17:33:15 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-13 17:33:15 -0700
commitbce00dac403d3be2be59218b7b93a56c34c68f1a (patch)
treeed1f903fa2d687e522a17163dd1e8ebdf9b8f013 /streaming/src/test/java/org/apache
parent59aaa1dad6bee06e38ee5c03bdf82354242286ee (diff)
downloadspark-bce00dac403d3be2be59218b7b93a56c34c68f1a.tar.gz
spark-bce00dac403d3be2be59218b7b93a56c34c68f1a.tar.bz2
spark-bce00dac403d3be2be59218b7b93a56c34c68f1a.zip
[SPARK-6752] [STREAMING] [REVISED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
This is a revision of the earlier version (see #5773) that passed the active SparkContext explicitly through a new set of Java and Scala API. The drawbacks are. * Hard to implement in python. * New API introduced. This is even more confusing since we are introducing getActiveOrCreate in SPARK-7553 Furthermore, there is now a direct way get an existing active SparkContext or create a new on - SparkContext.getOrCreate(conf). Its better to use this to get the SparkContext rather than have a new API to explicitly pass the context. So in this PR I have * Removed the new versions of StreamingContext.getOrCreate() which took SparkContext * Added the ability to pick up existing SparkContext when the StreamingContext tries to create a SparkContext. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6096 from tdas/SPARK-6752 and squashes the following commits: 53f4b2d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 f024b77 [Tathagata Das] Removed extra API and used SparkContext.getOrCreate
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java25
1 files changed, 3 insertions, 22 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2e00b980b9..1077b1b2cb 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1766,29 +1766,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
- // Function to create JavaStreamingContext using existing JavaSparkContext
- // without any output operations (used to detect the new context)
- Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
- new Function<JavaSparkContext, JavaStreamingContext>() {
- public JavaStreamingContext call(JavaSparkContext context) {
- newContextCreated.set(true);
- return new JavaStreamingContext(context, Seconds.apply(1));
- }
- };
-
- JavaSparkContext sc = new JavaSparkContext(conf);
- newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
- Assert.assertTrue("new context not created", newContextCreated.get());
- ssc.stop(false);
-
newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
- Assert.assertTrue("new context not created", newContextCreated.get());
- ssc.stop(false);
-
- newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+ new org.apache.hadoop.conf.Configuration());
Assert.assertTrue("old context not recovered", !newContextCreated.get());
ssc.stop();
}