diff options
author | Patrick Wendell <patrick@databricks.com> | 2015-04-25 10:37:34 -0700 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-04-25 10:37:34 -0700 |
commit | a61d65fc8b97c01be0fa756b52afdc91c46a8561 (patch) | |
tree | 08b0db2edb2dc17a49fcf0dfbfb976f4a3fcf5da /streaming/src/test/java | |
parent | cca9905b93483614b330b09b36c6526b551e17dc (diff) | |
download | spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.tar.gz spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.tar.bz2 spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.zip |
Revert "[SPARK-6752][Streaming] Allow StreamingContext to be recreated from checkpoint and existing SparkContext"
This reverts commit 534f2a43625fbf1a3a65d09550a19875cd1dce43.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 145 |
1 files changed, 37 insertions, 108 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index cb2e8380b4..90340753a4 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -22,12 +22,10 @@ import java.lang.Iterable; import java.nio.charset.Charset; import java.util.*; -import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - import scala.Tuple2; import org.junit.Assert; @@ -47,7 +45,6 @@ import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.*; import org.apache.spark.util.Utils; -import org.apache.spark.SparkConf; // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; @@ -932,7 +929,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { return in.swap(); } - }); + }); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -990,12 +987,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaDStream<Integer> reversed = pairStream.map( - new Function<Tuple2<String, Integer>, Integer>() { - @Override - public Integer call(Tuple2<String, Integer> in) throws Exception { - return in._2(); - } - }); + new Function<Tuple2<String, Integer>, Integer>() { + @Override + public Integer call(Tuple2<String, Integer> in) throws Exception { + return in._2(); + } + }); JavaTestUtils.attachTestOutputStream(reversed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1126,7 +1123,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( new Function<Integer, Integer>() { - @Override + @Override public Integer call(Integer i) throws Exception { return i; } @@ -1147,14 +1144,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("hello")); List<List<Tuple2<String, Long>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("world", 1L)), - Arrays.asList( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("moon", 1L)), - Arrays.asList( - new Tuple2<String, Long>("hello", 1L))); + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("world", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("moon", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Long> counted = stream.countByValue(); @@ -1252,17 +1249,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { - @Override - public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v : values) { - out = out + v; - } - return Optional.of(out); + @Override + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; } + return Optional.of(out); + } }); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1295,17 +1292,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { - @Override - public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v : values) { - out = out + v; - } - return Optional.of(out); + @Override + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; } + return Optional.of(out); + } }, new HashPartitioner(1), initialRDD); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1331,7 +1328,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> reduceWindowed = pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), - new Duration(2000), new Duration(1000)); + new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1710,74 +1707,6 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Utils.deleteRecursively(tempDir); } - @SuppressWarnings("unchecked") - @Test - public void testContextGetOrCreate() throws InterruptedException { - - final SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("newContext", "true"); - - File emptyDir = Files.createTempDir(); - emptyDir.deleteOnExit(); - StreamingContextSuite contextSuite = new StreamingContextSuite(); - String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint(); - String checkpointDir = contextSuite.createValidCheckpoint(); - - // Function to create JavaStreamingContext without any output operations - // (used to detect the new context) - final MutableBoolean newContextCreated = new MutableBoolean(false); - Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() { - public JavaStreamingContext call() { - newContextCreated.setValue(true); - return new JavaStreamingContext(conf, Seconds.apply(1)); - } - }; - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc); - Assert.assertTrue("new context not created", newContextCreated.isTrue()); - ssc.stop(); - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration(), true); - Assert.assertTrue("new context not created", newContextCreated.isTrue()); - ssc.stop(); - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration()); - Assert.assertTrue("old context not recovered", newContextCreated.isFalse()); - ssc.stop(); - - // Function to create JavaStreamingContext using existing JavaSparkContext - // without any output operations (used to detect the new context) - Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 = - new Function<JavaSparkContext, JavaStreamingContext>() { - public JavaStreamingContext call(JavaSparkContext context) { - newContextCreated.setValue(true); - return new JavaStreamingContext(context, Seconds.apply(1)); - } - }; - - JavaSparkContext sc = new JavaSparkContext(conf); - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc); - Assert.assertTrue("new context not created", newContextCreated.isTrue()); - ssc.stop(false); - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true); - Assert.assertTrue("new context not created", newContextCreated.isTrue()); - ssc.stop(false); - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc); - Assert.assertTrue("old context not recovered", newContextCreated.isFalse()); - ssc.stop(); - } /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD @SuppressWarnings("unchecked") |