diff options
author | Patrick Wendell <patrick@databricks.com> | 2015-04-25 10:37:34 -0700 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-04-25 10:37:34 -0700 |
commit | a61d65fc8b97c01be0fa756b52afdc91c46a8561 (patch) | |
tree | 08b0db2edb2dc17a49fcf0dfbfb976f4a3fcf5da /streaming/src/test | |
parent | cca9905b93483614b330b09b36c6526b551e17dc (diff) | |
download | spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.tar.gz spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.tar.bz2 spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.zip |
Revert "[SPARK-6752][Streaming] Allow StreamingContext to be recreated from checkpoint and existing SparkContext"
This reverts commit 534f2a43625fbf1a3a65d09550a19875cd1dce43.
Diffstat (limited to 'streaming/src/test')
3 files changed, 39 insertions, 268 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index cb2e8380b4..90340753a4 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -22,12 +22,10 @@ import java.lang.Iterable; import java.nio.charset.Charset; import java.util.*; -import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - import scala.Tuple2; import org.junit.Assert; @@ -47,7 +45,6 @@ import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.*; import org.apache.spark.util.Utils; -import org.apache.spark.SparkConf; // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; @@ -932,7 +929,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { return in.swap(); } - }); + }); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -990,12 +987,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaDStream<Integer> reversed = pairStream.map( - new Function<Tuple2<String, Integer>, Integer>() { - @Override - public Integer call(Tuple2<String, Integer> in) throws Exception { - return in._2(); - } - }); + new Function<Tuple2<String, Integer>, Integer>() { + @Override + public Integer call(Tuple2<String, Integer> in) throws Exception { + return in._2(); + } + }); JavaTestUtils.attachTestOutputStream(reversed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1126,7 +1123,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( new Function<Integer, Integer>() { - @Override + @Override public Integer call(Integer i) throws Exception { return i; } @@ -1147,14 +1144,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("hello")); List<List<Tuple2<String, Long>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("world", 1L)), - Arrays.asList( - new Tuple2<String, Long>("hello", 1L), - new Tuple2<String, Long>("moon", 1L)), - Arrays.asList( - new Tuple2<String, Long>("hello", 1L))); + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("world", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("moon", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Long> counted = stream.countByValue(); @@ -1252,17 +1249,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { - @Override - public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v : values) { - out = out + v; - } - return Optional.of(out); + @Override + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; } + return Optional.of(out); + } }); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1295,17 +1292,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { - @Override - public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v : values) { - out = out + v; - } - return Optional.of(out); + @Override + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; } + return Optional.of(out); + } }, new HashPartitioner(1), initialRDD); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1331,7 +1328,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> reduceWindowed = pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), - new Duration(2000), new Duration(1000)); + new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1710,74 +1707,6 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Utils.deleteRecursively(tempDir); } - @SuppressWarnings("unchecked") - @Test - public void testContextGetOrCreate() throws InterruptedException { - - final SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("newContext", "true"); - - File emptyDir = Files.createTempDir(); - emptyDir.deleteOnExit(); - StreamingContextSuite contextSuite = new StreamingContextSuite(); - String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint(); - String checkpointDir = contextSuite.createValidCheckpoint(); - - // Function to create JavaStreamingContext without any output operations - // (used to detect the new context) - final MutableBoolean newContextCreated = new MutableBoolean(false); - Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() { - public JavaStreamingContext call() { - newContextCreated.setValue(true); - return new JavaStreamingContext(conf, Seconds.apply(1)); - } - }; - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc); - Assert.assertTrue("new context not created", newContextCreated.isTrue()); - ssc.stop(); - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration(), true); - Assert.assertTrue("new context not created", newContextCreated.isTrue()); - ssc.stop(); - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc, - new org.apache.hadoop.conf.Configuration()); - Assert.assertTrue("old context not recovered", newContextCreated.isFalse()); - ssc.stop(); - - // Function to create JavaStreamingContext using existing JavaSparkContext - // without any output operations (used to detect the new context) - Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 = - new Function<JavaSparkContext, JavaStreamingContext>() { - public JavaStreamingContext call(JavaSparkContext context) { - newContextCreated.setValue(true); - return new JavaStreamingContext(context, Seconds.apply(1)); - } - }; - - JavaSparkContext sc = new JavaSparkContext(conf); - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc); - Assert.assertTrue("new context not created", newContextCreated.isTrue()); - ssc.stop(false); - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true); - Assert.assertTrue("new context not created", newContextCreated.isTrue()); - ssc.stop(false); - - newContextCreated.setValue(false); - ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc); - Assert.assertTrue("old context not recovered", newContextCreated.isFalse()); - ssc.stop(); - } /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD @SuppressWarnings("unchecked") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 6b0a3f91d4..54c30440a6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -430,8 +430,9 @@ class CheckpointSuite extends TestSuiteBase { assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3) } // Wait for a checkpoint to be written + val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration) eventually(eventuallyTimeout) { - assert(Checkpoint.getCheckpointFiles(checkpointDir).size === 6) + assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6) } ssc.stop() // Check that we shut down while the third batch was being processed diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 4f193322ad..58353a5f97 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -17,10 +17,8 @@ package org.apache.spark.streaming -import java.io.File import java.util.concurrent.atomic.AtomicInteger -import org.apache.commons.io.FileUtils import org.scalatest.{Assertions, BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Timeouts import org.scalatest.concurrent.Eventually._ @@ -332,139 +330,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } - test("getOrCreate") { - val conf = new SparkConf().setMaster(master).setAppName(appName) - - // Function to create StreamingContext that has a config to identify it to be new context - var newContextCreated = false - def creatingFunction(): StreamingContext = { - newContextCreated = true - new StreamingContext(conf, batchDuration) - } - - // Call ssc.stop after a body of code - def testGetOrCreate(body: => Unit): Unit = { - newContextCreated = false - try { - body - } finally { - if (ssc != null) { - ssc.stop() - } - ssc = null - } - } - - val emptyPath = Utils.createTempDir().getAbsolutePath() - - // getOrCreate should create new context with empty path - testGetOrCreate { - ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _) - assert(ssc != null, "no context created") - assert(newContextCreated, "new context not created") - } - - val corrutedCheckpointPath = createCorruptedCheckpoint() - - // getOrCreate should throw exception with fake checkpoint file and createOnError = false - intercept[Exception] { - ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _) - } - - // getOrCreate should throw exception with fake checkpoint file - intercept[Exception] { - ssc = StreamingContext.getOrCreate( - corrutedCheckpointPath, creatingFunction _, createOnError = false) - } - - // getOrCreate should create new context with fake checkpoint file and createOnError = true - testGetOrCreate { - ssc = StreamingContext.getOrCreate( - corrutedCheckpointPath, creatingFunction _, createOnError = true) - assert(ssc != null, "no context created") - assert(newContextCreated, "new context not created") - } - - val checkpointPath = createValidCheckpoint() - - // getOrCreate should recover context with checkpoint path, and recover old configuration - testGetOrCreate { - ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _) - assert(ssc != null, "no context created") - assert(!newContextCreated, "old context not recovered") - assert(ssc.conf.get("someKey") === "someValue") - } - } - - test("getOrCreate with existing SparkContext") { - val conf = new SparkConf().setMaster(master).setAppName(appName) - sc = new SparkContext(conf) - - // Function to create StreamingContext that has a config to identify it to be new context - var newContextCreated = false - def creatingFunction(sparkContext: SparkContext): StreamingContext = { - newContextCreated = true - new StreamingContext(sparkContext, batchDuration) - } - - // Call ssc.stop(stopSparkContext = false) after a body of cody - def testGetOrCreate(body: => Unit): Unit = { - newContextCreated = false - try { - body - } finally { - if (ssc != null) { - ssc.stop(stopSparkContext = false) - } - ssc = null - } - } - - val emptyPath = Utils.createTempDir().getAbsolutePath() - - // getOrCreate should create new context with empty path - testGetOrCreate { - ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true) - assert(ssc != null, "no context created") - assert(newContextCreated, "new context not created") - assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext") - } - - val corrutedCheckpointPath = createCorruptedCheckpoint() - - // getOrCreate should throw exception with fake checkpoint file and createOnError = false - intercept[Exception] { - ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc) - } - - // getOrCreate should throw exception with fake checkpoint file - intercept[Exception] { - ssc = StreamingContext.getOrCreate( - corrutedCheckpointPath, creatingFunction _, sc, createOnError = false) - } - - // getOrCreate should create new context with fake checkpoint file and createOnError = true - testGetOrCreate { - ssc = StreamingContext.getOrCreate( - corrutedCheckpointPath, creatingFunction _, sc, createOnError = true) - assert(ssc != null, "no context created") - assert(newContextCreated, "new context not created") - assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext") - } - - val checkpointPath = createValidCheckpoint() - - // StreamingContext.getOrCreate should recover context with checkpoint path - testGetOrCreate { - ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc) - assert(ssc != null, "no context created") - assert(!newContextCreated, "old context not recovered") - assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext") - assert(!ssc.conf.contains("someKey"), - "recovered StreamingContext unexpectedly has old config") - } - } - test("DStream and generated RDD creation sites") { testPackage.test() } @@ -474,30 +339,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val inputStream = new TestInputStream(s, input, 1) inputStream } - - def createValidCheckpoint(): String = { - val testDirectory = Utils.createTempDir().getAbsolutePath() - val checkpointDirectory = Utils.createTempDir().getAbsolutePath() - val conf = new SparkConf().setMaster(master).setAppName(appName) - conf.set("someKey", "someValue") - ssc = new StreamingContext(conf, batchDuration) - ssc.checkpoint(checkpointDirectory) - ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() } - ssc.start() - eventually(timeout(10000 millis)) { - assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1) - } - ssc.stop() - checkpointDirectory - } - - def createCorruptedCheckpoint(): String = { - val checkpointDirectory = Utils.createTempDir().getAbsolutePath() - val fakeCheckpointFile = Checkpoint.checkpointFile(checkpointDirectory, Time(1000)) - FileUtils.write(new File(fakeCheckpointFile.toString()), "blablabla") - assert(Checkpoint.getCheckpointFiles(checkpointDirectory).nonEmpty) - checkpointDirectory - } } class TestException(msg: String) extends Exception(msg) |