aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-12 19:02:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-12 19:02:27 -0800
commit034f89aaab1db95e8908432f2445d6841526efcf (patch)
tree894aebd0f08d22f6b78c4d1849e522c50b8b6730 /streaming
parent74d0126257838f29e3fad519b9f1a5acde88bef6 (diff)
downloadspark-034f89aaab1db95e8908432f2445d6841526efcf.tar.gz
spark-034f89aaab1db95e8908432f2445d6841526efcf.tar.bz2
spark-034f89aaab1db95e8908432f2445d6841526efcf.zip
Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala17
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala14
5 files changed, 33 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 31038a06b8..8faa79f8c7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -78,7 +78,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def remember(duration: Duration) {
this.synchronized {
if (rememberDuration != null) {
- throw new Exception("Batch duration already set as " + batchDuration +
+ throw new Exception("Remember duration already set as " + batchDuration +
". cannot set it again.")
}
rememberDuration = duration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ee83ae902b..7b27933403 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -168,7 +168,7 @@ class StreamingContext private[streaming] (
}
/**
- * Set the context to periodically checkpoint the DStream operations for master
+ * Set the context to periodically checkpoint the DStream operations for driver
* fault-tolerance.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
* Note that this must be a fault-tolerant file system like HDFS for
@@ -220,7 +220,7 @@ class StreamingContext private[streaming] (
def actorStream[T: ClassTag](
props: Props,
name: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
@@ -272,6 +272,7 @@ class StreamingContext private[streaming] (
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @tparam T Type of the objects in the received blocks
*/
def rawSocketStream[T: ClassTag](
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index a9d605d55d..a2f0b88cb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -151,7 +151,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
- * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
@@ -161,7 +160,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
- * lines.
+ * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
@@ -302,6 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream with any arbitrary user implemented actor receiver.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param props Props object defining creation of the actor
* @param name Name of the actor
*
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 89c43ff935..6301772468 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -32,13 +32,14 @@ class WindowedDStream[T: ClassTag](
extends DStream[T](parent.ssc) {
if (!_windowDuration.isMultipleOf(parent.slideDuration))
- throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " +
- "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " +
+ "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
if (!_slideDuration.isMultipleOf(parent.slideDuration))
- throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
- "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " +
+ "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ // Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)
def windowDuration: Duration = _windowDuration
@@ -49,6 +50,14 @@ class WindowedDStream[T: ClassTag](
override def parentRememberDuration: Duration = rememberDuration + windowDuration
+ override def persist(level: StorageLevel): DStream[T] = {
+ // Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying
+ // RDDs and persisting the windowed RDDs would store numerous copies of the underlying data.
+ // Instead control the persistence of the parent DStream.
+ parent.persist(level)
+ this
+ }
+
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index 8f3c2dd86c..471c99fab4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.storage.StorageLevel
class WindowOperationsSuite extends TestSuiteBase {
@@ -144,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(3)
)
+ test("window - persistence level") {
+ val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5))
+ val ssc = new StreamingContext(conf, batchDuration)
+ val inputStream = new TestInputStream[Int](ssc, input, 1)
+ val windowStream1 = inputStream.window(batchDuration * 2)
+ assert(windowStream1.storageLevel === StorageLevel.NONE)
+ assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER)
+ windowStream1.persist(StorageLevel.MEMORY_ONLY)
+ assert(windowStream1.storageLevel === StorageLevel.NONE)
+ assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY)
+ ssc.stop()
+ }
+
// Testing naive reduceByKeyAndWindow (without invertible function)
testReduceByKeyAndWindow(