aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala19
-rw-r--r--streaming/src/test/scala/JavaTestUtils.scala9
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java68
3 files changed, 93 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 2833793b94..ebbc516b38 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -13,14 +13,29 @@ import java.io.InputStream
import java.util.{Map => JMap}
class JavaStreamingContext(val ssc: StreamingContext) {
- def this(master: String, frameworkName: String, batchDuration: Duration) =
- this(new StreamingContext(master, frameworkName, batchDuration))
// TODOs:
// - Test StreamingContext functions
// - Test to/from Hadoop functions
// - Support creating and registering InputStreams
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param frameworkName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(master: String, frameworkName: String, batchDuration: Duration) =
+ this(new StreamingContext(master, frameworkName, batchDuration))
+
+ /**
+ * Re-creates a StreamingContext from a checkpoint file.
+ * @param path Path either to the directory that was specified as the checkpoint directory, or
+ * to the checkpoint file 'graph' or 'graph.bk'.
+ */
+ def this(path: String) = this (new StreamingContext(path))
+
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala
index 24ebc15e38..56349837e5 100644
--- a/streaming/src/test/scala/JavaTestUtils.scala
+++ b/streaming/src/test/scala/JavaTestUtils.scala
@@ -8,7 +8,7 @@ import java.util.ArrayList
import collection.JavaConversions._
/** Exposes streaming test functionality in a Java-friendly way. */
-object JavaTestUtils extends TestSuiteBase {
+trait JavaTestBase extends TestSuiteBase {
/**
* Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context.
@@ -56,3 +56,10 @@ object JavaTestUtils extends TestSuiteBase {
}
}
+object JavaTestUtils extends JavaTestBase {
+
+}
+
+object JavaCheckpointTestUtils extends JavaTestBase {
+ override def actuallyWait = true
+} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index 41fd9f99ff..8a63e8cd3f 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -3,6 +3,7 @@ package spark.streaming;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.io.Files;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.junit.After;
import org.junit.Assert;
@@ -17,6 +18,7 @@ import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
import spark.streaming.JavaTestUtils;
+import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
import sun.org.mozilla.javascript.annotations.JSFunction;
@@ -871,6 +873,72 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
+ @Test
+ public void testCheckpointMasterRecovery() throws InterruptedException {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("this", "is"),
+ Arrays.asList("a", "test"),
+ Arrays.asList("counting", "letters"));
+
+ List<List<Integer>> expectedInitial = Arrays.asList(
+ Arrays.asList(4,2));
+ List<List<Integer>> expectedFinal = Arrays.asList(
+ Arrays.asList(1,4),
+ Arrays.asList(8,7));
+
+
+ File tempDir = Files.createTempDir();
+ sc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+
+ JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ });
+ JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
+ List<List<Integer>> initialResult = JavaTestUtils.runStreams(sc, 1, 1);
+
+ assertOrderInvariantEquals(expectedInitial, initialResult);
+ Thread.sleep(1000);
+
+ sc.stop();
+ sc = new JavaStreamingContext(tempDir.getAbsolutePath());
+ sc.start();
+ List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(sc, 2, 2);
+ assertOrderInvariantEquals(expectedFinal, finalResult);
+ }
+
+ /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ @Test
+ public void testCheckpointofIndividualStream() throws InterruptedException {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("this", "is"),
+ Arrays.asList("a", "test"),
+ Arrays.asList("counting", "letters"));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(4,2),
+ Arrays.asList(1,4),
+ Arrays.asList(8,7));
+
+ JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ });
+ JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
+
+ letterCount.checkpoint(new Duration(1000));
+
+ List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(sc, 3, 3);
+ assertOrderInvariantEquals(expected, result1);
+ }
+ */
+
// Input stream tests. These mostly just test that we can instantiate a given InputStream with
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.