aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-04-29 13:10:31 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-29 13:10:31 -0700
commita9c4e29950a14e32acaac547e9a0e8879fd37fc9 (patch)
treedb3c3d075bf52c74a5704f4aaa768aed592966fb /streaming/src/test/java/org/apache
parent1868bd40dcce23990b98748b0239bd00452b1ca5 (diff)
downloadspark-a9c4e29950a14e32acaac547e9a0e8879fd37fc9.tar.gz
spark-a9c4e29950a14e32acaac547e9a0e8879fd37fc9.tar.bz2
spark-a9c4e29950a14e32acaac547e9a0e8879fd37fc9.zip
[SPARK-6752] [STREAMING] [REOPENED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
Original PR #5428 got reverted due to issues between MutableBoolean and Hadoop 1.0.4 (see JIRA). This replaces MutableBoolean with AtomicBoolean. srowen pwendell Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5773 from tdas/SPARK-6752 and squashes the following commits: a0c0ead [Tathagata Das] Fix for hadoop 1.0.4 70ae85b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 94db63c [Tathagata Das] Fix long line. 524f519 [Tathagata Das] Many changes based on PR comments. eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate 36a7823 [Tathagata Das] Minor changes. 204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java145
1 files changed, 108 insertions, 37 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 90340753a4..b1adf881dd 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -21,11 +21,13 @@ import java.io.*;
import java.lang.Iterable;
import java.nio.charset.Charset;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
import scala.Tuple2;
import org.junit.Assert;
@@ -45,6 +47,7 @@ import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.util.Utils;
+import org.apache.spark.SparkConf;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -929,7 +932,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
return in.swap();
}
- });
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -987,12 +990,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaDStream<Integer> reversed = pairStream.map(
- new Function<Tuple2<String, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<String, Integer> in) throws Exception {
- return in._2();
- }
- });
+ new Function<Tuple2<String, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<String, Integer> in) throws Exception {
+ return in._2();
+ }
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1123,7 +1126,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
new Function<Integer, Integer>() {
- @Override
+ @Override
public Integer call(Integer i) throws Exception {
return i;
}
@@ -1144,14 +1147,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("world", 1L)),
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("moon", 1L)),
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L)));
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Long> counted = stream.countByValue();
@@ -1249,17 +1252,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v: values) {
- out = out + v;
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v : values) {
+ out = out + v;
+ }
+ return Optional.of(out);
}
- return Optional.of(out);
- }
});
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1292,17 +1295,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v: values) {
- out = out + v;
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v : values) {
+ out = out + v;
+ }
+ return Optional.of(out);
}
- return Optional.of(out);
- }
}, new HashPartitioner(1), initialRDD);
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1328,7 +1331,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
- new Duration(2000), new Duration(1000));
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1707,6 +1710,74 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Utils.deleteRecursively(tempDir);
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testContextGetOrCreate() throws InterruptedException {
+
+ final SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("newContext", "true");
+
+ File emptyDir = Files.createTempDir();
+ emptyDir.deleteOnExit();
+ StreamingContextSuite contextSuite = new StreamingContextSuite();
+ String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
+ String checkpointDir = contextSuite.createValidCheckpoint();
+
+ // Function to create JavaStreamingContext without any output operations
+ // (used to detect the new context)
+ final AtomicBoolean newContextCreated = new AtomicBoolean(false);
+ Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() {
+ public JavaStreamingContext call() {
+ newContextCreated.set(true);
+ return new JavaStreamingContext(conf, Seconds.apply(1));
+ }
+ };
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop();
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
+ new org.apache.hadoop.conf.Configuration(), true);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop();
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+ new org.apache.hadoop.conf.Configuration());
+ Assert.assertTrue("old context not recovered", !newContextCreated.get());
+ ssc.stop();
+
+ // Function to create JavaStreamingContext using existing JavaSparkContext
+ // without any output operations (used to detect the new context)
+ Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
+ new Function<JavaSparkContext, JavaStreamingContext>() {
+ public JavaStreamingContext call(JavaSparkContext context) {
+ newContextCreated.set(true);
+ return new JavaStreamingContext(context, Seconds.apply(1));
+ }
+ };
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop(false);
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop(false);
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
+ Assert.assertTrue("old context not recovered", !newContextCreated.get());
+ ssc.stop();
+ }
/* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
@SuppressWarnings("unchecked")