aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorPatrick Wendell <patrick@databricks.com>2015-04-25 10:37:34 -0700
committerPatrick Wendell <patrick@databricks.com>2015-04-25 10:37:34 -0700
commita61d65fc8b97c01be0fa756b52afdc91c46a8561 (patch)
tree08b0db2edb2dc17a49fcf0dfbfb976f4a3fcf5da /streaming/src/test
parentcca9905b93483614b330b09b36c6526b551e17dc (diff)
downloadspark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.tar.gz
spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.tar.bz2
spark-a61d65fc8b97c01be0fa756b52afdc91c46a8561.zip
Revert "[SPARK-6752][Streaming] Allow StreamingContext to be recreated from checkpoint and existing SparkContext"
This reverts commit 534f2a43625fbf1a3a65d09550a19875cd1dce43.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java145
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala159
3 files changed, 39 insertions, 268 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index cb2e8380b4..90340753a4 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -22,12 +22,10 @@ import java.lang.Iterable;
import java.nio.charset.Charset;
import java.util.*;
-import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
import scala.Tuple2;
import org.junit.Assert;
@@ -47,7 +45,6 @@ import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.util.Utils;
-import org.apache.spark.SparkConf;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -932,7 +929,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
return in.swap();
}
- });
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -990,12 +987,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaDStream<Integer> reversed = pairStream.map(
- new Function<Tuple2<String, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<String, Integer> in) throws Exception {
- return in._2();
- }
- });
+ new Function<Tuple2<String, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<String, Integer> in) throws Exception {
+ return in._2();
+ }
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1126,7 +1123,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
new Function<Integer, Integer>() {
- @Override
+ @Override
public Integer call(Integer i) throws Exception {
return i;
}
@@ -1147,14 +1144,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("world", 1L)),
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("moon", 1L)),
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L)));
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Long> counted = stream.countByValue();
@@ -1252,17 +1249,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v : values) {
- out = out + v;
- }
- return Optional.of(out);
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v: values) {
+ out = out + v;
}
+ return Optional.of(out);
+ }
});
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1295,17 +1292,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v : values) {
- out = out + v;
- }
- return Optional.of(out);
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v: values) {
+ out = out + v;
}
+ return Optional.of(out);
+ }
}, new HashPartitioner(1), initialRDD);
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1331,7 +1328,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
- new Duration(2000), new Duration(1000));
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1710,74 +1707,6 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Utils.deleteRecursively(tempDir);
}
- @SuppressWarnings("unchecked")
- @Test
- public void testContextGetOrCreate() throws InterruptedException {
-
- final SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("newContext", "true");
-
- File emptyDir = Files.createTempDir();
- emptyDir.deleteOnExit();
- StreamingContextSuite contextSuite = new StreamingContextSuite();
- String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
- String checkpointDir = contextSuite.createValidCheckpoint();
-
- // Function to create JavaStreamingContext without any output operations
- // (used to detect the new context)
- final MutableBoolean newContextCreated = new MutableBoolean(false);
- Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() {
- public JavaStreamingContext call() {
- newContextCreated.setValue(true);
- return new JavaStreamingContext(conf, Seconds.apply(1));
- }
- };
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc);
- Assert.assertTrue("new context not created", newContextCreated.isTrue());
- ssc.stop();
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration(), true);
- Assert.assertTrue("new context not created", newContextCreated.isTrue());
- ssc.stop();
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
- new org.apache.hadoop.conf.Configuration());
- Assert.assertTrue("old context not recovered", newContextCreated.isFalse());
- ssc.stop();
-
- // Function to create JavaStreamingContext using existing JavaSparkContext
- // without any output operations (used to detect the new context)
- Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
- new Function<JavaSparkContext, JavaStreamingContext>() {
- public JavaStreamingContext call(JavaSparkContext context) {
- newContextCreated.setValue(true);
- return new JavaStreamingContext(context, Seconds.apply(1));
- }
- };
-
- JavaSparkContext sc = new JavaSparkContext(conf);
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
- Assert.assertTrue("new context not created", newContextCreated.isTrue());
- ssc.stop(false);
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
- Assert.assertTrue("new context not created", newContextCreated.isTrue());
- ssc.stop(false);
-
- newContextCreated.setValue(false);
- ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
- Assert.assertTrue("old context not recovered", newContextCreated.isFalse());
- ssc.stop();
- }
/* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
@SuppressWarnings("unchecked")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 6b0a3f91d4..54c30440a6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -430,8 +430,9 @@ class CheckpointSuite extends TestSuiteBase {
assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
}
// Wait for a checkpoint to be written
+ val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
eventually(eventuallyTimeout) {
- assert(Checkpoint.getCheckpointFiles(checkpointDir).size === 6)
+ assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6)
}
ssc.stop()
// Check that we shut down while the third batch was being processed
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 4f193322ad..58353a5f97 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -17,10 +17,8 @@
package org.apache.spark.streaming
-import java.io.File
import java.util.concurrent.atomic.AtomicInteger
-import org.apache.commons.io.FileUtils
import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Timeouts
import org.scalatest.concurrent.Eventually._
@@ -332,139 +330,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
- test("getOrCreate") {
- val conf = new SparkConf().setMaster(master).setAppName(appName)
-
- // Function to create StreamingContext that has a config to identify it to be new context
- var newContextCreated = false
- def creatingFunction(): StreamingContext = {
- newContextCreated = true
- new StreamingContext(conf, batchDuration)
- }
-
- // Call ssc.stop after a body of code
- def testGetOrCreate(body: => Unit): Unit = {
- newContextCreated = false
- try {
- body
- } finally {
- if (ssc != null) {
- ssc.stop()
- }
- ssc = null
- }
- }
-
- val emptyPath = Utils.createTempDir().getAbsolutePath()
-
- // getOrCreate should create new context with empty path
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- }
-
- val corrutedCheckpointPath = createCorruptedCheckpoint()
-
- // getOrCreate should throw exception with fake checkpoint file and createOnError = false
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _)
- }
-
- // getOrCreate should throw exception with fake checkpoint file
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, createOnError = false)
- }
-
- // getOrCreate should create new context with fake checkpoint file and createOnError = true
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, createOnError = true)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- }
-
- val checkpointPath = createValidCheckpoint()
-
- // getOrCreate should recover context with checkpoint path, and recover old configuration
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
- assert(ssc != null, "no context created")
- assert(!newContextCreated, "old context not recovered")
- assert(ssc.conf.get("someKey") === "someValue")
- }
- }
-
- test("getOrCreate with existing SparkContext") {
- val conf = new SparkConf().setMaster(master).setAppName(appName)
- sc = new SparkContext(conf)
-
- // Function to create StreamingContext that has a config to identify it to be new context
- var newContextCreated = false
- def creatingFunction(sparkContext: SparkContext): StreamingContext = {
- newContextCreated = true
- new StreamingContext(sparkContext, batchDuration)
- }
-
- // Call ssc.stop(stopSparkContext = false) after a body of cody
- def testGetOrCreate(body: => Unit): Unit = {
- newContextCreated = false
- try {
- body
- } finally {
- if (ssc != null) {
- ssc.stop(stopSparkContext = false)
- }
- ssc = null
- }
- }
-
- val emptyPath = Utils.createTempDir().getAbsolutePath()
-
- // getOrCreate should create new context with empty path
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
- }
-
- val corrutedCheckpointPath = createCorruptedCheckpoint()
-
- // getOrCreate should throw exception with fake checkpoint file and createOnError = false
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
- }
-
- // getOrCreate should throw exception with fake checkpoint file
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
- }
-
- // getOrCreate should create new context with fake checkpoint file and createOnError = true
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
- }
-
- val checkpointPath = createValidCheckpoint()
-
- // StreamingContext.getOrCreate should recover context with checkpoint path
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
- assert(ssc != null, "no context created")
- assert(!newContextCreated, "old context not recovered")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
- assert(!ssc.conf.contains("someKey"),
- "recovered StreamingContext unexpectedly has old config")
- }
- }
-
test("DStream and generated RDD creation sites") {
testPackage.test()
}
@@ -474,30 +339,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val inputStream = new TestInputStream(s, input, 1)
inputStream
}
-
- def createValidCheckpoint(): String = {
- val testDirectory = Utils.createTempDir().getAbsolutePath()
- val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
- val conf = new SparkConf().setMaster(master).setAppName(appName)
- conf.set("someKey", "someValue")
- ssc = new StreamingContext(conf, batchDuration)
- ssc.checkpoint(checkpointDirectory)
- ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() }
- ssc.start()
- eventually(timeout(10000 millis)) {
- assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
- }
- ssc.stop()
- checkpointDirectory
- }
-
- def createCorruptedCheckpoint(): String = {
- val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
- val fakeCheckpointFile = Checkpoint.checkpointFile(checkpointDirectory, Time(1000))
- FileUtils.write(new File(fakeCheckpointFile.toString()), "blablabla")
- assert(Checkpoint.getCheckpointFiles(checkpointDirectory).nonEmpty)
- checkpointDirectory
- }
}
class TestException(msg: String) extends Exception(msg)