aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-06-30 11:14:38 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-30 11:14:48 -0700
commitc83ec10cbcf2a7834da1c065a6291857386c1ae8 (patch)
treef69c6694ced655742f1018d538859d470c5e1139
parent894404cb237f8e5fc2b73ac36468f1af524a4238 (diff)
downloadspark-c83ec10cbcf2a7834da1c065a6291857386c1ae8.tar.gz
spark-c83ec10cbcf2a7834da1c065a6291857386c1ae8.tar.bz2
spark-c83ec10cbcf2a7834da1c065a6291857386c1ae8.zip
[SPARK-8630] [STREAMING] Prevent from checkpointing QueueInputDStream
This PR throws an exception in `QueueInputDStream.writeObject` so that it can fail the application when calling `StreamingContext.start` rather than failing it during recovering QueueInputDStream. Author: zsxwing <zsxwing@gmail.com> Closes #7016 from zsxwing/queueStream-checkpoint and squashes the following commits: 89a3d73 [zsxwing] Fix JavaAPISuite.testQueueStream cc40fd7 [zsxwing] Prevent from checkpointing QueueInputDStream (cherry picked from commit 57264400ac7d9f9c59c387c252a9ed8d93fed4fa) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala15
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala15
5 files changed, 56 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 1708f309fc..ec49d0f42d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -477,6 +477,10 @@ class StreamingContext private[streaming] (
/**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
+ *
+ * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
+ * those RDDs, so `queueStream` doesn't support checkpointing.
+ *
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
@@ -491,6 +495,10 @@ class StreamingContext private[streaming] (
/**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
+ *
+ * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
+ * those RDDs, so `queueStream` doesn't support checkpointing.
+ *
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 989e3a729e..40deb6d7ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -419,7 +419,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
- * NOTE: changes to the queue after the stream is created will not be recognized.
+ * NOTE:
+ * 1. Changes to the queue after the stream is created will not be recognized.
+ * 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
+ * those RDDs, so `queueStream` doesn't support checkpointing.
+ *
* @param queue Queue of RDDs
* @tparam T Type of objects in the RDD
*/
@@ -435,7 +439,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
- * NOTE: changes to the queue after the stream is created will not be recognized.
+ * NOTE:
+ * 1. Changes to the queue after the stream is created will not be recognized.
+ * 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
+ * those RDDs, so `queueStream` doesn't support checkpointing.
+ *
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
@@ -455,7 +463,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
- * NOTE: changes to the queue after the stream is created will not be recognized.
+ * NOTE:
+ * 1. Changes to the queue after the stream is created will not be recognized.
+ * 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
+ * those RDDs, so `queueStream` doesn't support checkpointing.
+ *
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index ed7da6dc13..a2f5d82a79 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -17,13 +17,14 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
-import scala.collection.mutable.Queue
-import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.streaming.{Time, StreamingContext}
+import java.io.{NotSerializableException, ObjectOutputStream}
+
+import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.reflect.ClassTag
+import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.streaming.{Time, StreamingContext}
+
private[streaming]
class QueueInputDStream[T: ClassTag](
@transient ssc: StreamingContext,
@@ -36,6 +37,10 @@ class QueueInputDStream[T: ClassTag](
override def stop() { }
+ private def writeObject(oos: ObjectOutputStream): Unit = {
+ throw new NotSerializableException("queueStream doesn't support checkpointing")
+ }
+
override def compute(validTime: Time): Option[RDD[T]] = {
val buffer = new ArrayBuffer[RDD[T]]()
if (oneAtATime && queue.size > 0) {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 1077b1b2cb..a34f234758 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -364,6 +364,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
public void testQueueStream() {
+ ssc.stop();
+ // Create a new JavaStreamingContext without checkpointing
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
+
List<List<Integer>> expected = Arrays.asList(
Arrays.asList(1,2,3),
Arrays.asList(4,5,6),
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 819dd2ccfe..56b4ce5638 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.streaming
import java.io.{File, NotSerializableException}
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.mutable.Queue
+
import org.apache.commons.io.FileUtils
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts
@@ -665,6 +667,19 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
transformed.foreachRDD { rdd => rdd.collect() } }
}
+ test("queueStream doesn't support checkpointing") {
+ val checkpointDir = Utils.createTempDir()
+ ssc = new StreamingContext(master, appName, batchDuration)
+ val rdd = ssc.sparkContext.parallelize(1 to 10)
+ ssc.queueStream[Int](Queue(rdd)).print()
+ ssc.checkpoint(checkpointDir.getAbsolutePath)
+ val e = intercept[NotSerializableException] {
+ ssc.start()
+ }
+ // StreamingContext.validate changes the message, so use "contains" here
+ assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
+ }
+
def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1)