diff options
Diffstat (limited to 'streaming')
32 files changed, 76 insertions, 45 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 668e5324e6..8faa79f8c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -17,11 +17,11 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.dstream.{NetworkInputDStream, InputDStream} +import scala.collection.mutable.ArrayBuffer import java.io.{ObjectInputStream, IOException, ObjectOutputStream} -import collection.mutable.ArrayBuffer import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream} final private[streaming] class DStreamGraph extends Serializable with Logging { @@ -78,7 +78,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def remember(duration: Duration) { this.synchronized { if (rememberDuration != null) { - throw new Exception("Batch duration already set as " + batchDuration + + throw new Exception("Remember duration already set as " + batchDuration + ". cannot set it again.") } rememberDuration = duration diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ee83ae902b..7b27933403 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -168,7 +168,7 @@ class StreamingContext private[streaming] ( } /** - * Set the context to periodically checkpoint the DStream operations for master + * Set the context to periodically checkpoint the DStream operations for driver * fault-tolerance. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. * Note that this must be a fault-tolerant file system like HDFS for @@ -220,7 +220,7 @@ class StreamingContext private[streaming] ( def actorStream[T: ClassTag]( props: Props, name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy ): DStream[T] = { networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) @@ -272,6 +272,7 @@ class StreamingContext private[streaming] ( * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @tparam T Type of the objects in the received blocks */ def rawSocketStream[T: ClassTag]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index d29033df32..c92854ccd9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -17,13 +17,14 @@ package org.apache.spark.streaming.api.java -import org.apache.spark.streaming.{Duration, Time, DStream} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.api.java.JavaRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import scala.reflect.ClassTag +import org.apache.spark.streaming.dstream.DStream /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index cea4795eb5..1ec4492bca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -30,6 +30,7 @@ import org.apache.spark.api.java.function.{Function3 => JFunction3, _} import java.util import org.apache.spark.rdd.RDD import JavaDStream._ +import org.apache.spark.streaming.dstream.DStream trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]] extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 6c3467d405..6bb985ca54 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -35,6 +35,7 @@ import org.apache.spark.storage.StorageLevel import com.google.common.base.Optional import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions +import org.apache.spark.streaming.dstream.DStream class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifest: ClassTag[K], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index b4c46f5e50..a2f0b88cb0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -36,6 +36,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.dstream.DStream /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -150,7 +151,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel) : JavaDStream[String] = { @@ -160,7 +160,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create a input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited - * lines. + * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data */ @@ -301,6 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream with any arbitrary user implemented actor receiver. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param props Props object defining creation of the actor * @param name Name of the actor * diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index f760093579..a7c4cca7ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -15,22 +15,23 @@ * limitations under the License. */ -package org.apache.spark.streaming +package org.apache.spark.streaming.dstream -import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import scala.deprecated import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import StreamingContext._ +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.scheduler.Job import org.apache.spark.util.MetadataCleaner +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.streaming.Duration /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -42,7 +43,7 @@ import org.apache.spark.util.MetadataCleaner * by a parent DStream. * * This class contains the basic operations available on all DStreams, such as `map`, `filter` and - * `window`. In addition, [[org.apache.spark.streaming.PairDStreamFunctions]] contains operations available + * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through * implicit conversions when `spark.streaming.StreamingContext._` is imported. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 671f7bbce7..2da4127f47 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -15,17 +15,15 @@ * limitations under the License. */ -package org.apache.spark.streaming +package org.apache.spark.streaming.dstream -import scala.collection.mutable.{HashMap, HashSet} +import scala.collection.mutable.HashMap import scala.reflect.ClassTag - +import java.io.{ObjectInputStream, IOException} import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem - import org.apache.spark.Logging - -import java.io.{ObjectInputStream, IOException} +import org.apache.spark.streaming.Time private[streaming] class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index f10d483634..37c46b26a5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD -import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} +import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.util.TimeStampedHashMap diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index db2e0a4cee..c81534ae58 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index 244dc3ee4f..6586234554 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index 336c4b7a92..c7bb2833ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 364abcde68..905bc723f6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.streaming.scheduler.Job import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index 23136f44fa..a9bb51f054 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 8f84232cab..a1075ad304 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream} +import org.apache.spark.streaming.{Time, Duration, StreamingContext} import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index 8a04060e5b..3d8ee29df1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 0ce364fd46..7aea1f945d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index c0b7491d09..02704a8d1c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 69d80c3711..6b3e48382e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming +package org.apache.spark.streaming.dstream import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream._ @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.{Time, Duration} class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index db56345ca8..7a6b1ea35e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -26,7 +26,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer -import org.apache.spark.streaming.{Duration, Interval, Time, DStream} +import org.apache.spark.streaming.{Duration, Interval, Time} import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index 84e69f277b..880a89bc36 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag private[streaming] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index b34ba7b9b4..9d8889b655 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.Partitioner import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Time, DStream} +import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index aeea060df7..7cd4554282 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag private[streaming] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 0d84ec84f2..4ecba03ab5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -17,9 +17,8 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD -import collection.mutable.ArrayBuffer import org.apache.spark.rdd.UnionRDD import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 89c43ff935..6301772468 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -32,13 +32,14 @@ class WindowedDStream[T: ClassTag]( extends DStream[T](parent.ssc) { if (!_windowDuration.isMultipleOf(parent.slideDuration)) - throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " + + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") if (!_slideDuration.isMultipleOf(parent.slideDuration)) - throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " + + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + // Persist parent level by default, as those RDDs are going to be obviously reused. parent.persist(StorageLevel.MEMORY_ONLY_SER) def windowDuration: Duration = _windowDuration @@ -49,6 +50,14 @@ class WindowedDStream[T: ClassTag]( override def parentRememberDuration: Duration = rememberDuration + windowDuration + override def persist(level: StorageLevel): DStream[T] = { + // Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying + // RDDs and persisting the windowed RDDs would store numerous copies of the underlying data. + // Instead control the persistence of the parent DStream. + parent.persist(level) + this + } + override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 592e84791b..be67af3a64 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.util import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.ForEachDStream +import org.apache.spark.streaming.dstream.{DStream, ForEachDStream} import StreamingContext._ import scala.util.Random diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 9406e0e20a..7037aae234 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._ import util.ManualClock import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.streaming.dstream.DStream class BasicOperationsSuite extends TestSuiteBase { test("map") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 67ce5bc566..0c68c44ddb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -26,7 +26,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.FileInputDStream +import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import org.apache.spark.SparkConf diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a477d200c9..f7f3346f81 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkException, SparkConf, SparkContext} import org.apache.spark.util.{Utils, MetadataCleaner} +import org.apache.spark.streaming.dstream.DStream class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { @@ -186,7 +187,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) inputStream.map(x => { throw new TestException("error in map task"); x}) - .foreach(_.count) + .foreachRDD(_.count) val exception = intercept[Exception] { ssc.start() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index fa64142096..9e0f2c900e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.ArrayBuffer import org.scalatest.matchers.ShouldMatchers +import org.apache.spark.streaming.dstream.DStream class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 9b2bb57e77..535e5bd1f1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream} +import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} import org.apache.spark.streaming.util.ManualClock import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index c39abfc21b..471c99fab4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.storage.StorageLevel class WindowOperationsSuite extends TestSuiteBase { @@ -143,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase { Seconds(3) ) + test("window - persistence level") { + val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)) + val ssc = new StreamingContext(conf, batchDuration) + val inputStream = new TestInputStream[Int](ssc, input, 1) + val windowStream1 = inputStream.window(batchDuration * 2) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER) + windowStream1.persist(StorageLevel.MEMORY_ONLY) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY) + ssc.stop() + } + // Testing naive reduceByKeyAndWindow (without invertible function) testReduceByKeyAndWindow( |