aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorPatrick Wendell <patrick@databricks.com>2015-04-25 10:37:34 -0700
committerPatrick Wendell <patrick@databricks.com>2015-04-25 10:37:34 -0700
commita61d65fc8b97c01be0fa756b52afdc91c46a8561 (patch)
tree08b0db2edb2dc17a49fcf0dfbfb976f4a3fcf5da /streaming/src/test/java
parentcca9905b93483614b330b09b36c6526b551e17dc (diff)
downloadspark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.tar.gz
spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.tar.bz2
spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.zip
Revert "[SPARK-6752][Streaming] Allow StreamingContext to be recreated from checkpoint and existing SparkContext"
This reverts commit 534f2a43625fbf1a3a65d09550a19875cd1dce43.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java145
1 files changed, 37 insertions, 108 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index cb2e8380b4..90340753a4 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -22,12 +22,10 @@ import java.lang.Iterable;
import java.nio.charset.Charset;
import java.util.*;
-import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
import scala.Tuple2;
import org.junit.Assert;
@@ -47,7 +45,6 @@ import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.util.Utils;
-import org.apache.spark.SparkConf;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -932,7 +929,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
return in.swap();
}
- });
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -990,12 +987,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaDStream<Integer> reversed = pairStream.map(
- new Function<Tuple2<String, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<String, Integer> in) throws Exception {
- return in._2();
- }
- });
+ new Function<Tuple2<String, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<String, Integer> in) throws Exception {
+ return in._2();
+ }
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1126,7 +1123,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
new Function<Integer, Integer>() {
- @Override
+ @Override
public Integer call(Integer i) throws Exception {
return i;
}
@@ -1147,14 +1144,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("world", 1L)),
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("moon", 1L)),
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L)));
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Long> counted = stream.countByValue();
@@ -1252,17 +1249,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v : values) {
- out = out + v;
- }
- return Optional.of(out);
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v: values) {
+ out = out + v;
}
+ return Optional.of(out);
+ }
});
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1295,17 +1292,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v : values) {
- out = out + v;
- }
- return Optional.of(out);
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v: values) {
+ out = out + v;
}
+ return Optional.of(out);
+ }
}, new HashPartitioner(1), initialRDD);
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1331,7 +1328,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
- new Duration(2000), new Duration(1000));
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1710,74 +1707,6 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Utils.deleteRecursively(tempDir);
}
- @SuppressWarnings("unchecked")
- @Test
- public void testContextGetOrCreate() throws InterruptedException {
-
- final SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("newContext", "true");
-
- File emptyDir = Files.createTempDir();
- emptyDir.deleteOnExit();
- StreamingContextSuite contextSuite = new StreamingContextSuite();
- String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
- String checkpointDir = contextSuite.createValidCheckpoint();
-
- // Function to create JavaStreamingContext without any output operations
- // (used to detect the new context)
- final MutableBoolean newContextCreated = new MutableBoolean(false);
- Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() {
- public JavaStreamingContext call() {
- newContextCreated.setValue(true);
- return new JavaStreamingContext(conf, Seconds.apply(1));
- }
- };
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc);
- Assert.assertTrue("new context not created", newContextCreated.isTrue());
- ssc.stop();
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration(), true);
- Assert.assertTrue("new context not created", newContextCreated.isTrue());
- ssc.stop();
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration());
- Assert.assertTrue("old context not recovered", newContextCreated.isFalse());
- ssc.stop();
-
- // Function to create JavaStreamingContext using existing JavaSparkContext
- // without any output operations (used to detect the new context)
- Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
- new Function<JavaSparkContext, JavaStreamingContext>() {
- public JavaStreamingContext call(JavaSparkContext context) {
- newContextCreated.setValue(true);
- return new JavaStreamingContext(context, Seconds.apply(1));
- }
- };
-
- JavaSparkContext sc = new JavaSparkContext(conf);
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
- Assert.assertTrue("new context not created", newContextCreated.isTrue());
- ssc.stop(false);
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
- Assert.assertTrue("new context not created", newContextCreated.isTrue());
- ssc.stop(false);
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
- Assert.assertTrue("old context not recovered", newContextCreated.isFalse());
- ssc.stop();
- }
/* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
@SuppressWarnings("unchecked")