From 034f89aaab1db95e8908432f2445d6841526efcf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 12 Jan 2014 19:02:27 -0800 Subject: Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams. --- .../scala/org/apache/spark/streaming/DStreamGraph.scala | 2 +- .../org/apache/spark/streaming/StreamingContext.scala | 5 +++-- .../spark/streaming/api/java/JavaStreamingContext.scala | 4 ++-- .../spark/streaming/dstream/WindowedDStream.scala | 17 +++++++++++++---- .../apache/spark/streaming/WindowOperationsSuite.scala | 14 ++++++++++++++ 5 files changed, 33 insertions(+), 9 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 31038a06b8..8faa79f8c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -78,7 +78,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def remember(duration: Duration) { this.synchronized { if (rememberDuration != null) { - throw new Exception("Batch duration already set as " + batchDuration + + throw new Exception("Remember duration already set as " + batchDuration + ". cannot set it again.") } rememberDuration = duration diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ee83ae902b..7b27933403 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -168,7 +168,7 @@ class StreamingContext private[streaming] ( } /** - * Set the context to periodically checkpoint the DStream operations for master + * Set the context to periodically checkpoint the DStream operations for driver * fault-tolerance. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. * Note that this must be a fault-tolerant file system like HDFS for @@ -220,7 +220,7 @@ class StreamingContext private[streaming] ( def actorStream[T: ClassTag]( props: Props, name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy ): DStream[T] = { networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) @@ -272,6 +272,7 @@ class StreamingContext private[streaming] ( * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @tparam T Type of the objects in the received blocks */ def rawSocketStream[T: ClassTag]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index a9d605d55d..a2f0b88cb0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -151,7 +151,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel) : JavaDStream[String] = { @@ -161,7 +160,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create a input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited - * lines. + * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data */ @@ -302,6 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream with any arbitrary user implemented actor receiver. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param props Props object defining creation of the actor * @param name Name of the actor * diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 89c43ff935..6301772468 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -32,13 +32,14 @@ class WindowedDStream[T: ClassTag]( extends DStream[T](parent.ssc) { if (!_windowDuration.isMultipleOf(parent.slideDuration)) - throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " + + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") if (!_slideDuration.isMultipleOf(parent.slideDuration)) - throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " + + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + // Persist parent level by default, as those RDDs are going to be obviously reused. parent.persist(StorageLevel.MEMORY_ONLY_SER) def windowDuration: Duration = _windowDuration @@ -49,6 +50,14 @@ class WindowedDStream[T: ClassTag]( override def parentRememberDuration: Duration = rememberDuration + windowDuration + override def persist(level: StorageLevel): DStream[T] = { + // Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying + // RDDs and persisting the windowed RDDs would store numerous copies of the underlying data. + // Instead control the persistence of the parent DStream. + parent.persist(level) + this + } + override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index 8f3c2dd86c..471c99fab4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.storage.StorageLevel class WindowOperationsSuite extends TestSuiteBase { @@ -144,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase { Seconds(3) ) + test("window - persistence level") { + val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)) + val ssc = new StreamingContext(conf, batchDuration) + val inputStream = new TestInputStream[Int](ssc, input, 1) + val windowStream1 = inputStream.window(batchDuration * 2) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER) + windowStream1.persist(StorageLevel.MEMORY_ONLY) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY) + ssc.stop() + } + // Testing naive reduceByKeyAndWindow (without invertible function) testReduceByKeyAndWindow( -- cgit v1.2.3