aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-06-30 11:14:38 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-30 11:14:38 -0700
commit57264400ac7d9f9c59c387c252a9ed8d93fed4fa (patch)
tree8f1e5d4351ec0bf5aa644a28c625de119398dad1 /streaming/src/test/java/org/apache
parentca7e460f7d6fb898dc29236a85520bbe954c8a13 (diff)
downloadspark-57264400ac7d9f9c59c387c252a9ed8d93fed4fa.tar.gz
spark-57264400ac7d9f9c59c387c252a9ed8d93fed4fa.tar.bz2
spark-57264400ac7d9f9c59c387c252a9ed8d93fed4fa.zip
[SPARK-8630] [STREAMING] Prevent from checkpointing QueueInputDStream
This PR throws an exception in `QueueInputDStream.writeObject` so that it can fail the application when calling `StreamingContext.start` rather than failing it during recovering QueueInputDStream. Author: zsxwing <zsxwing@gmail.com> Closes #7016 from zsxwing/queueStream-checkpoint and squashes the following commits: 89a3d73 [zsxwing] Fix JavaAPISuite.testQueueStream cc40fd7 [zsxwing] Prevent from checkpointing QueueInputDStream
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java8
1 files changed, 8 insertions, 0 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 1077b1b2cb..a34f234758 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -364,6 +364,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
public void testQueueStream() {
+ ssc.stop();
+ // Create a new JavaStreamingContext without checkpointing
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
+
List<List<Integer>> expected = Arrays.asList(
Arrays.asList(1,2,3),
Arrays.asList(4,5,6),