aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-04-29 13:10:31 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-29 13:10:31 -0700
commita9c4e29950a14e32acaac547e9a0e8879fd37fc9 (patch)
treedb3c3d075bf52c74a5704f4aaa768aed592966fb /streaming/src/test
parent1868bd40dcce23990b98748b0239bd00452b1ca5 (diff)
downloadspark-a9c4e29950a14e32acaac547e9a0e8879fd37fc9.tar.gz
spark-a9c4e29950a14e32acaac547e9a0e8879fd37fc9.tar.bz2
spark-a9c4e29950a14e32acaac547e9a0e8879fd37fc9.zip
[SPARK-6752] [STREAMING] [REOPENED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
Original PR #5428 got reverted due to issues between MutableBoolean and Hadoop 1.0.4 (see JIRA). This replaces MutableBoolean with AtomicBoolean. srowen pwendell Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5773 from tdas/SPARK-6752 and squashes the following commits: a0c0ead [Tathagata Das] Fix for hadoop 1.0.4 70ae85b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752 94db63c [Tathagata Das] Fix long line. 524f519 [Tathagata Das] Many changes based on PR comments. eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate 36a7823 [Tathagata Das] Minor changes. 204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java145
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala159
3 files changed, 268 insertions, 39 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 90340753a4..b1adf881dd 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -21,11 +21,13 @@ import java.io.*;
import java.lang.Iterable;
import java.nio.charset.Charset;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
import scala.Tuple2;
import org.junit.Assert;
@@ -45,6 +47,7 @@ import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.util.Utils;
+import org.apache.spark.SparkConf;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -929,7 +932,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
return in.swap();
}
- });
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -987,12 +990,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaDStream<Integer> reversed = pairStream.map(
- new Function<Tuple2<String, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<String, Integer> in) throws Exception {
- return in._2();
- }
- });
+ new Function<Tuple2<String, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<String, Integer> in) throws Exception {
+ return in._2();
+ }
+ });
JavaTestUtils.attachTestOutputStream(reversed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1123,7 +1126,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
new Function<Integer, Integer>() {
- @Override
+ @Override
public Integer call(Integer i) throws Exception {
return i;
}
@@ -1144,14 +1147,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("world", 1L)),
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L),
- new Tuple2<String, Long>("moon", 1L)),
- Arrays.asList(
- new Tuple2<String, Long>("hello", 1L)));
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Long> counted = stream.countByValue();
@@ -1249,17 +1252,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v: values) {
- out = out + v;
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v : values) {
+ out = out + v;
+ }
+ return Optional.of(out);
}
- return Optional.of(out);
- }
});
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1292,17 +1295,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v: values) {
- out = out + v;
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v : values) {
+ out = out + v;
+ }
+ return Optional.of(out);
}
- return Optional.of(out);
- }
}, new HashPartitioner(1), initialRDD);
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1328,7 +1331,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
- new Duration(2000), new Duration(1000));
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1707,6 +1710,74 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Utils.deleteRecursively(tempDir);
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testContextGetOrCreate() throws InterruptedException {
+
+ final SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("newContext", "true");
+
+ File emptyDir = Files.createTempDir();
+ emptyDir.deleteOnExit();
+ StreamingContextSuite contextSuite = new StreamingContextSuite();
+ String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
+ String checkpointDir = contextSuite.createValidCheckpoint();
+
+ // Function to create JavaStreamingContext without any output operations
+ // (used to detect the new context)
+ final AtomicBoolean newContextCreated = new AtomicBoolean(false);
+ Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() {
+ public JavaStreamingContext call() {
+ newContextCreated.set(true);
+ return new JavaStreamingContext(conf, Seconds.apply(1));
+ }
+ };
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop();
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
+ new org.apache.hadoop.conf.Configuration(), true);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop();
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+ new org.apache.hadoop.conf.Configuration());
+ Assert.assertTrue("old context not recovered", !newContextCreated.get());
+ ssc.stop();
+
+ // Function to create JavaStreamingContext using existing JavaSparkContext
+ // without any output operations (used to detect the new context)
+ Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
+ new Function<JavaSparkContext, JavaStreamingContext>() {
+ public JavaStreamingContext call(JavaSparkContext context) {
+ newContextCreated.set(true);
+ return new JavaStreamingContext(context, Seconds.apply(1));
+ }
+ };
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop(false);
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
+ Assert.assertTrue("new context not created", newContextCreated.get());
+ ssc.stop(false);
+
+ newContextCreated.set(false);
+ ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
+ Assert.assertTrue("old context not recovered", !newContextCreated.get());
+ ssc.stop();
+ }
/* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
@SuppressWarnings("unchecked")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 54c30440a6..6b0a3f91d4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -430,9 +430,8 @@ class CheckpointSuite extends TestSuiteBase {
assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
}
// Wait for a checkpoint to be written
- val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
eventually(eventuallyTimeout) {
- assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6)
+ assert(Checkpoint.getCheckpointFiles(checkpointDir).size === 6)
}
ssc.stop()
// Check that we shut down while the third batch was being processed
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 09440b1e79..5207b7109e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -17,8 +17,10 @@
package org.apache.spark.streaming
+import java.io.File
import java.util.concurrent.atomic.AtomicInteger
+import org.apache.commons.io.FileUtils
import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Timeouts
import org.scalatest.concurrent.Eventually._
@@ -330,6 +332,139 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
+ test("getOrCreate") {
+ val conf = new SparkConf().setMaster(master).setAppName(appName)
+
+ // Function to create StreamingContext that has a config to identify it to be new context
+ var newContextCreated = false
+ def creatingFunction(): StreamingContext = {
+ newContextCreated = true
+ new StreamingContext(conf, batchDuration)
+ }
+
+ // Call ssc.stop after a body of code
+ def testGetOrCreate(body: => Unit): Unit = {
+ newContextCreated = false
+ try {
+ body
+ } finally {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ ssc = null
+ }
+ }
+
+ val emptyPath = Utils.createTempDir().getAbsolutePath()
+
+ // getOrCreate should create new context with empty path
+ testGetOrCreate {
+ ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated, "new context not created")
+ }
+
+ val corrutedCheckpointPath = createCorruptedCheckpoint()
+
+ // getOrCreate should throw exception with fake checkpoint file and createOnError = false
+ intercept[Exception] {
+ ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _)
+ }
+
+ // getOrCreate should throw exception with fake checkpoint file
+ intercept[Exception] {
+ ssc = StreamingContext.getOrCreate(
+ corrutedCheckpointPath, creatingFunction _, createOnError = false)
+ }
+
+ // getOrCreate should create new context with fake checkpoint file and createOnError = true
+ testGetOrCreate {
+ ssc = StreamingContext.getOrCreate(
+ corrutedCheckpointPath, creatingFunction _, createOnError = true)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated, "new context not created")
+ }
+
+ val checkpointPath = createValidCheckpoint()
+
+ // getOrCreate should recover context with checkpoint path, and recover old configuration
+ testGetOrCreate {
+ ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
+ assert(ssc != null, "no context created")
+ assert(!newContextCreated, "old context not recovered")
+ assert(ssc.conf.get("someKey") === "someValue")
+ }
+ }
+
+ test("getOrCreate with existing SparkContext") {
+ val conf = new SparkConf().setMaster(master).setAppName(appName)
+ sc = new SparkContext(conf)
+
+ // Function to create StreamingContext that has a config to identify it to be new context
+ var newContextCreated = false
+ def creatingFunction(sparkContext: SparkContext): StreamingContext = {
+ newContextCreated = true
+ new StreamingContext(sparkContext, batchDuration)
+ }
+
+ // Call ssc.stop(stopSparkContext = false) after a body of cody
+ def testGetOrCreate(body: => Unit): Unit = {
+ newContextCreated = false
+ try {
+ body
+ } finally {
+ if (ssc != null) {
+ ssc.stop(stopSparkContext = false)
+ }
+ ssc = null
+ }
+ }
+
+ val emptyPath = Utils.createTempDir().getAbsolutePath()
+
+ // getOrCreate should create new context with empty path
+ testGetOrCreate {
+ ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated, "new context not created")
+ assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
+ }
+
+ val corrutedCheckpointPath = createCorruptedCheckpoint()
+
+ // getOrCreate should throw exception with fake checkpoint file and createOnError = false
+ intercept[Exception] {
+ ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
+ }
+
+ // getOrCreate should throw exception with fake checkpoint file
+ intercept[Exception] {
+ ssc = StreamingContext.getOrCreate(
+ corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
+ }
+
+ // getOrCreate should create new context with fake checkpoint file and createOnError = true
+ testGetOrCreate {
+ ssc = StreamingContext.getOrCreate(
+ corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
+ assert(ssc != null, "no context created")
+ assert(newContextCreated, "new context not created")
+ assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
+ }
+
+ val checkpointPath = createValidCheckpoint()
+
+ // StreamingContext.getOrCreate should recover context with checkpoint path
+ testGetOrCreate {
+ ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
+ assert(ssc != null, "no context created")
+ assert(!newContextCreated, "old context not recovered")
+ assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
+ assert(!ssc.conf.contains("someKey"),
+ "recovered StreamingContext unexpectedly has old config")
+ }
+ }
+
test("DStream and generated RDD creation sites") {
testPackage.test()
}
@@ -339,6 +474,30 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val inputStream = new TestInputStream(s, input, 1)
inputStream
}
+
+ def createValidCheckpoint(): String = {
+ val testDirectory = Utils.createTempDir().getAbsolutePath()
+ val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
+ val conf = new SparkConf().setMaster(master).setAppName(appName)
+ conf.set("someKey", "someValue")
+ ssc = new StreamingContext(conf, batchDuration)
+ ssc.checkpoint(checkpointDirectory)
+ ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() }
+ ssc.start()
+ eventually(timeout(10000 millis)) {
+ assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
+ }
+ ssc.stop()
+ checkpointDirectory
+ }
+
+ def createCorruptedCheckpoint(): String = {
+ val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
+ val fakeCheckpointFile = Checkpoint.checkpointFile(checkpointDirectory, Time(1000))
+ FileUtils.write(new File(fakeCheckpointFile.toString()), "blablabla")
+ assert(Checkpoint.getCheckpointFiles(checkpointDirectory).nonEmpty)
+ checkpointDirectory
+ }
}
class TestException(msg: String) extends Exception(msg)