From 448aef6790caa3728bcc43f518afb69807597c39 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 12 Jan 2014 11:31:54 -0800 Subject: Moved DStream, DStreamCheckpointData and PairDStream from org.apache.spark.streaming to org.apache.spark.streaming.dstream. --- .../scala/org/apache/spark/streaming/DStream.scala | 741 -------------------- .../spark/streaming/DStreamCheckpointData.scala | 128 ---- .../org/apache/spark/streaming/DStreamGraph.scala | 4 +- .../spark/streaming/PairDStreamFunctions.scala | 621 ----------------- .../spark/streaming/api/java/JavaDStream.scala | 3 +- .../spark/streaming/api/java/JavaDStreamLike.scala | 1 + .../spark/streaming/api/java/JavaPairDStream.scala | 1 + .../streaming/api/java/JavaStreamingContext.scala | 1 + .../apache/spark/streaming/dstream/DStream.scala | 742 +++++++++++++++++++++ .../streaming/dstream/DStreamCheckpointData.scala | 126 ++++ .../spark/streaming/dstream/FileInputDStream.scala | 2 +- .../spark/streaming/dstream/FilteredDStream.scala | 2 +- .../streaming/dstream/FlatMapValuedDStream.scala | 2 +- .../streaming/dstream/FlatMappedDStream.scala | 2 +- .../spark/streaming/dstream/ForEachDStream.scala | 2 +- .../spark/streaming/dstream/GlommedDStream.scala | 2 +- .../spark/streaming/dstream/InputDStream.scala | 2 +- .../streaming/dstream/MapPartitionedDStream.scala | 2 +- .../spark/streaming/dstream/MapValuedDStream.scala | 2 +- .../spark/streaming/dstream/MappedDStream.scala | 2 +- .../streaming/dstream/PairDStreamFunctions.scala | 622 +++++++++++++++++ .../streaming/dstream/ReducedWindowedDStream.scala | 2 +- .../spark/streaming/dstream/ShuffledDStream.scala | 2 +- .../spark/streaming/dstream/StateDStream.scala | 2 +- .../streaming/dstream/TransformedDStream.scala | 2 +- .../spark/streaming/dstream/UnionDStream.scala | 3 +- .../spark/streaming/util/MasterFailureTest.scala | 2 +- .../spark/streaming/BasicOperationsSuite.scala | 1 + .../apache/spark/streaming/CheckpointSuite.scala | 2 +- .../spark/streaming/StreamingContextSuite.scala | 1 + .../spark/streaming/StreamingListenerSuite.scala | 1 + .../org/apache/spark/streaming/TestSuiteBase.scala | 2 +- .../spark/streaming/WindowOperationsSuite.scala | 1 + 33 files changed, 1519 insertions(+), 1512 deletions(-) delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/DStream.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala deleted file mode 100644 index d59146e069..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ /dev/null @@ -1,741 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -import StreamingContext._ -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner - -import scala.collection.mutable.HashMap -import scala.reflect.ClassTag - -import java.io.{ObjectInputStream, IOException, ObjectOutputStream} - - -/** - * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] - * for more details on RDDs). DStreams can either be created from live data (such as, data from - * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations - * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each - * DStream periodically generates a RDD, either from live data or by transforming the RDD generated - * by a parent DStream. - * - * This class contains the basic operations available on all DStreams, such as `map`, `filter` and - * `window`. In addition, [[org.apache.spark.streaming.PairDStreamFunctions]] contains operations available - * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations - * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through - * implicit conversions when `spark.streaming.StreamingContext._` is imported. - * - * DStreams internally is characterized by a few basic properties: - * - A list of other DStreams that the DStream depends on - * - A time interval at which the DStream generates an RDD - * - A function that is used to generate an RDD after each time interval - */ - -abstract class DStream[T: ClassTag] ( - @transient private[streaming] var ssc: StreamingContext - ) extends Serializable with Logging { - - // ======================================================================= - // Methods that should be implemented by subclasses of DStream - // ======================================================================= - - /** Time interval after which the DStream generates a RDD */ - def slideDuration: Duration - - /** List of parent DStreams on which this DStream depends on */ - def dependencies: List[DStream[_]] - - /** Method that generates a RDD for the given time */ - def compute (validTime: Time): Option[RDD[T]] - - // ======================================================================= - // Methods and fields available on all DStreams - // ======================================================================= - - // RDDs generated, marked as private[streaming] so that testsuites can access it - @transient - private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () - - // Time zero for the DStream - private[streaming] var zeroTime: Time = null - - // Duration for which the DStream will remember each RDD created - private[streaming] var rememberDuration: Duration = null - - // Storage level of the RDDs in the stream - private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE - - // Checkpoint details - private[streaming] val mustCheckpoint = false - private[streaming] var checkpointDuration: Duration = null - private[streaming] val checkpointData = new DStreamCheckpointData(this) - - // Reference to whole DStream graph - private[streaming] var graph: DStreamGraph = null - - private[streaming] def isInitialized = (zeroTime != null) - - // Duration for which the DStream requires its parent DStream to remember each RDD created - private[streaming] def parentRememberDuration = rememberDuration - - /** Return the StreamingContext associated with this DStream */ - def context = ssc - - /** Persist the RDDs of this DStream with the given storage level */ - def persist(level: StorageLevel): DStream[T] = { - if (this.isInitialized) { - throw new UnsupportedOperationException( - "Cannot change storage level of an DStream after streaming context has started") - } - this.storageLevel = level - this - } - - /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ - def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) - - /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ - def cache(): DStream[T] = persist() - - /** - * Enable periodic checkpointing of RDDs of this DStream - * @param interval Time interval after which generated RDD will be checkpointed - */ - def checkpoint(interval: Duration): DStream[T] = { - if (isInitialized) { - throw new UnsupportedOperationException( - "Cannot change checkpoint interval of an DStream after streaming context has started") - } - persist() - checkpointDuration = interval - this - } - - /** - * Initialize the DStream by setting the "zero" time, based on which - * the validity of future times is calculated. This method also recursively initializes - * its parent DStreams. - */ - private[streaming] def initialize(time: Time) { - if (zeroTime != null && zeroTime != time) { - throw new Exception("ZeroTime is already initialized to " + zeroTime - + ", cannot initialize it again to " + time) - } - zeroTime = time - - // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger - if (mustCheckpoint && checkpointDuration == null) { - checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt - logInfo("Checkpoint interval automatically set to " + checkpointDuration) - } - - // Set the minimum value of the rememberDuration if not already set - var minRememberDuration = slideDuration - if (checkpointDuration != null && minRememberDuration <= checkpointDuration) { - minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten - } - if (rememberDuration == null || rememberDuration < minRememberDuration) { - rememberDuration = minRememberDuration - } - - // Initialize the dependencies - dependencies.foreach(_.initialize(zeroTime)) - } - - private[streaming] def validate() { - assert(rememberDuration != null, "Remember duration is set to null") - - assert( - !mustCheckpoint || checkpointDuration != null, - "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." + - " Please use DStream.checkpoint() to set the interval." - ) - - assert( - checkpointDuration == null || context.sparkContext.checkpointDir.isDefined, - "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" + - " or SparkContext.checkpoint() to set the checkpoint directory." - ) - - assert( - checkpointDuration == null || checkpointDuration >= slideDuration, - "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + - checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " + - "Please set it to at least " + slideDuration + "." - ) - - assert( - checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration), - "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + - checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " + - "Please set it to a multiple " + slideDuration + "." - ) - - assert( - checkpointDuration == null || storageLevel != StorageLevel.NONE, - "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " + - "level has not been set to enable persisting. Please use DStream.persist() to set the " + - "storage level to use memory for better checkpointing performance." - ) - - assert( - checkpointDuration == null || rememberDuration > checkpointDuration, - "The remember duration for " + this.getClass.getSimpleName + " has been set to " + - rememberDuration + " which is not more than the checkpoint interval (" + - checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." - ) - - val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf) - logInfo("metadataCleanupDelay = " + metadataCleanerDelay) - assert( - metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, - "It seems you are doing some DStream window operation or setting a checkpoint interval " + - "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + - "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + - "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + - "set the Java property 'spark.cleaner.delay' to more than " + - math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." - ) - - dependencies.foreach(_.validate()) - - logInfo("Slide time = " + slideDuration) - logInfo("Storage level = " + storageLevel) - logInfo("Checkpoint interval = " + checkpointDuration) - logInfo("Remember duration = " + rememberDuration) - logInfo("Initialized and validated " + this) - } - - private[streaming] def setContext(s: StreamingContext) { - if (ssc != null && ssc != s) { - throw new Exception("Context is already set in " + this + ", cannot set it again") - } - ssc = s - logInfo("Set context for " + this) - dependencies.foreach(_.setContext(ssc)) - } - - private[streaming] def setGraph(g: DStreamGraph) { - if (graph != null && graph != g) { - throw new Exception("Graph is already set in " + this + ", cannot set it again") - } - graph = g - dependencies.foreach(_.setGraph(graph)) - } - - private[streaming] def remember(duration: Duration) { - if (duration != null && duration > rememberDuration) { - rememberDuration = duration - logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) - } - dependencies.foreach(_.remember(parentRememberDuration)) - } - - /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ - private[streaming] def isTimeValid(time: Time): Boolean = { - if (!isInitialized) { - throw new Exception (this + " has not been initialized") - } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { - logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) - false - } else { - logDebug("Time " + time + " is valid") - true - } - } - - /** - * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal - * method that should not be called directly. - */ - private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { - // If this DStream was not initialized (i.e., zeroTime not set), then do it - // If RDD was already generated, then retrieve it from HashMap - generatedRDDs.get(time) match { - - // If an RDD was already generated and is being reused, then - // probably all RDDs in this DStream will be reused and hence should be cached - case Some(oldRDD) => Some(oldRDD) - - // if RDD was not generated, and if the time is valid - // (based on sliding time of this DStream), then generate the RDD - case None => { - if (isTimeValid(time)) { - compute(time) match { - case Some(newRDD) => - if (storageLevel != StorageLevel.NONE) { - newRDD.persist(storageLevel) - logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) - } - if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { - newRDD.checkpoint() - logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) - } - generatedRDDs.put(time, newRDD) - Some(newRDD) - case None => - None - } - } else { - None - } - } - } - } - - /** - * Generate a SparkStreaming job for the given time. This is an internal method that - * should not be called directly. This default implementation creates a job - * that materializes the corresponding RDD. Subclasses of DStream may override this - * to generate their own jobs. - */ - private[streaming] def generateJob(time: Time): Option[Job] = { - getOrCompute(time) match { - case Some(rdd) => { - val jobFunc = () => { - val emptyFunc = { (iterator: Iterator[T]) => {} } - context.sparkContext.runJob(rdd, emptyFunc) - } - Some(new Job(time, jobFunc)) - } - case None => None - } - } - - /** - * Clear metadata that are older than `rememberDuration` of this DStream. - * This is an internal method that should not be called directly. This default - * implementation clears the old generated RDDs. Subclasses of DStream may override - * this to clear their own metadata along with the generated RDDs. - */ - private[streaming] def clearMetadata(time: Time) { - val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) - generatedRDDs --= oldRDDs.keys - logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + - (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) - dependencies.foreach(_.clearMetadata(time)) - } - - /* Adds metadata to the Stream while it is running. - * This method should be overwritten by sublcasses of InputDStream. - */ - private[streaming] def addMetadata(metadata: Any) { - if (metadata != null) { - logInfo("Dropping Metadata: " + metadata.toString) - } - } - - /** - * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of - * this stream. This is an internal method that should not be called directly. This is - * a default implementation that saves only the file names of the checkpointed RDDs to - * checkpointData. Subclasses of DStream (especially those of InputDStream) may override - * this method to save custom checkpoint data. - */ - private[streaming] def updateCheckpointData(currentTime: Time) { - logInfo("Updating checkpoint data for time " + currentTime) - checkpointData.update(currentTime) - dependencies.foreach(_.updateCheckpointData(currentTime)) - logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) - } - - private[streaming] def clearCheckpointData(time: Time) { - logInfo("Clearing checkpoint data") - checkpointData.cleanup(time) - dependencies.foreach(_.clearCheckpointData(time)) - logInfo("Cleared checkpoint data") - } - - /** - * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method - * that should not be called directly. This is a default implementation that recreates RDDs - * from the checkpoint file names stored in checkpointData. Subclasses of DStream that - * override the updateCheckpointData() method would also need to override this method. - */ - private[streaming] def restoreCheckpointData() { - // Create RDDs from the checkpoint data - logInfo("Restoring checkpoint data") - checkpointData.restore() - dependencies.foreach(_.restoreCheckpointData()) - logInfo("Restored checkpoint data") - } - - @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { - logDebug(this.getClass().getSimpleName + ".writeObject used") - if (graph != null) { - graph.synchronized { - if (graph.checkpointInProgress) { - oos.defaultWriteObject() - } else { - val msg = "Object of " + this.getClass.getName + " is being serialized " + - " possibly as a part of closure of an RDD operation. This is because " + - " the DStream object is being referred to from within the closure. " + - " Please rewrite the RDD operation inside this DStream to avoid this. " + - " This has been enforced to avoid bloating of Spark tasks " + - " with unnecessary objects." - throw new java.io.NotSerializableException(msg) - } - } - } else { - throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.") - } - } - - @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { - logDebug(this.getClass().getSimpleName + ".readObject used") - ois.defaultReadObject() - generatedRDDs = new HashMap[Time, RDD[T]] () - } - - // ======================================================================= - // DStream operations - // ======================================================================= - - /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[U: ClassTag](mapFunc: T => U): DStream[U] = { - new MappedDStream(this, context.sparkContext.clean(mapFunc)) - } - - /** - * Return a new DStream by applying a function to all elements of this DStream, - * and then flattening the results - */ - def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = { - new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) - } - - /** Return a new DStream containing only the elements that satisfy a predicate. */ - def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) - - /** - * Return a new DStream in which each RDD is generated by applying glom() to each RDD of - * this DStream. Applying glom() to an RDD coalesces all elements within each partition into - * an array. - */ - def glom(): DStream[Array[T]] = new GlommedDStream(this) - - - /** - * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the - * returned DStream has exactly numPartitions partitions. - */ - def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions)) - - /** - * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs - * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition - * of the RDD. - */ - def mapPartitions[U: ClassTag]( - mapPartFunc: Iterator[T] => Iterator[U], - preservePartitioning: Boolean = false - ): DStream[U] = { - new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) - } - - /** - * Return a new DStream in which each RDD has a single element generated by reducing each RDD - * of this DStream. - */ - def reduce(reduceFunc: (T, T) => T): DStream[T] = - this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) - - /** - * Return a new DStream in which each RDD has a single element generated by counting each RDD - * of this DStream. - */ - def count(): DStream[Long] = { - this.map(_ => (null, 1L)) - .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) - .reduceByKey(_ + _) - .map(_._2) - } - - /** - * Return a new DStream in which each RDD contains the counts of each distinct value in - * each RDD of this DStream. Hash partitioning is used to generate - * the RDDs with `numPartitions` partitions (Spark's default number of partitions if - * `numPartitions` not specified). - */ - def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] = - this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. - */ - def foreach(foreachFunc: RDD[T] => Unit) { - this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) - } - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. - */ - def foreach(foreachFunc: (RDD[T], Time) => Unit) { - ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) - } - - /** - * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of 'this' DStream. - */ - def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) - } - - /** - * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of 'this' DStream. - */ - def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) - val cleanedF = context.sparkContext.clean(transformFunc) - val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { - assert(rdds.length == 1) - cleanedF(rdds.head.asInstanceOf[RDD[T]], time) - } - new TransformedDStream[U](Seq(this), realTransformFunc) - } - - /** - * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of 'this' DStream and 'other' DStream. - */ - def transformWith[U: ClassTag, V: ClassTag]( - other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] - ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) - transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) - } - - /** - * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of 'this' DStream and 'other' DStream. - */ - def transformWith[U: ClassTag, V: ClassTag]( - other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] - ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) - val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { - assert(rdds.length == 2) - val rdd1 = rdds(0).asInstanceOf[RDD[T]] - val rdd2 = rdds(1).asInstanceOf[RDD[U]] - cleanedF(rdd1, rdd2, time) - } - new TransformedDStream[V](Seq(this, other), realTransformFunc) - } - - /** - * Print the first ten elements of each RDD generated in this DStream. This is an output - * operator, so this DStream will be registered as an output stream and there materialized. - */ - def print() { - def foreachFunc = (rdd: RDD[T], time: Time) => { - val first11 = rdd.take(11) - println ("-------------------------------------------") - println ("Time: " + time) - println ("-------------------------------------------") - first11.take(10).foreach(println) - if (first11.size > 10) println("...") - println() - } - val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) - ssc.registerOutputStream(newStream) - } - - /** - * Return a new DStream in which each RDD contains all the elements in seen in a - * sliding window of time over this DStream. The new DStream generates RDDs with - * the same interval as this DStream. - * @param windowDuration width of the window; must be a multiple of this DStream's interval. - */ - def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) - - /** - * Return a new DStream in which each RDD contains all the elements in seen in a - * sliding window of time over this DStream. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = { - new WindowedDStream(this, windowDuration, slideDuration) - } - - /** - * Return a new DStream in which each RDD has a single element generated by reducing all - * elements in a sliding window over this DStream. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def reduceByWindow( - reduceFunc: (T, T) => T, - windowDuration: Duration, - slideDuration: Duration - ): DStream[T] = { - this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) - } - - /** - * Return a new DStream in which each RDD has a single element generated by reducing all - * elements in a sliding window over this DStream. However, the reduction is done incrementally - * using the old window's reduced value : - * 1. reduce the new values that entered the window (e.g., adding new counts) - * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient than reduceByWindow without "inverse reduce" function. - * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function - * @param invReduceFunc inverse reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def reduceByWindow( - reduceFunc: (T, T) => T, - invReduceFunc: (T, T) => T, - windowDuration: Duration, - slideDuration: Duration - ): DStream[T] = { - this.map(x => (1, x)) - .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) - .map(_._2) - } - - /** - * Return a new DStream in which each RDD has a single element generated by counting the number - * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with - * Spark's default number of partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { - this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) - } - - /** - * Return a new DStream in which each RDD contains the count of distinct elements in - * RDDs in a sliding window over this DStream. Hash partitioning is used to generate - * the RDDs with `numPartitions` partitions (Spark's default number of partitions if - * `numPartitions` not specified). - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions number of partitions of each RDD in the new DStream. - */ - def countByValueAndWindow( - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int = ssc.sc.defaultParallelism - ): DStream[(T, Long)] = { - - this.map(x => (x, 1L)).reduceByKeyAndWindow( - (x: Long, y: Long) => x + y, - (x: Long, y: Long) => x - y, - windowDuration, - slideDuration, - numPartitions, - (x: (T, Long)) => x._2 != 0L - ) - } - - /** - * Return a new DStream by unifying data of another DStream with this DStream. - * @param that Another DStream having the same slideDuration as this DStream. - */ - def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) - - /** - * Return all the RDDs defined by the Interval object (both end times included) - */ - def slice(interval: Interval): Seq[RDD[T]] = { - slice(interval.beginTime, interval.endTime) - } - - /** - * Return all the RDDs between 'fromTime' to 'toTime' (both included) - */ - def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { - if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") - } - if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") - } - val alignedToTime = toTime.floor(slideDuration) - val alignedFromTime = fromTime.floor(slideDuration) - - logInfo("Slicing from " + fromTime + " to " + toTime + - " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") - - alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { - if (time >= zeroTime) getOrCompute(time) else None - }) - } - - /** - * Save each RDD in this DStream as a Sequence file of serialized objects. - * The file name at each batch interval is generated based on `prefix` and - * `suffix`: "prefix-TIME_IN_MS.suffix". - */ - def saveAsObjectFiles(prefix: String, suffix: String = "") { - val saveFunc = (rdd: RDD[T], time: Time) => { - val file = rddToFileName(prefix, suffix, time) - rdd.saveAsObjectFile(file) - } - this.foreach(saveFunc) - } - - /** - * Save each RDD in this DStream as at text file, using string representation - * of elements. The file name at each batch interval is generated based on - * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". - */ - def saveAsTextFiles(prefix: String, suffix: String = "") { - val saveFunc = (rdd: RDD[T], time: Time) => { - val file = rddToFileName(prefix, suffix, time) - rdd.saveAsTextFile(file) - } - this.foreach(saveFunc) - } - - def register() { - ssc.registerOutputStream(this) - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala deleted file mode 100644 index 671f7bbce7..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -import scala.collection.mutable.{HashMap, HashSet} -import scala.reflect.ClassTag - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.FileSystem - -import org.apache.spark.Logging - -import java.io.{ObjectInputStream, IOException} - -private[streaming] -class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) - extends Serializable with Logging { - protected val data = new HashMap[Time, AnyRef]() - - // Mapping of the batch time to the checkpointed RDD file of that time - @transient private var timeToCheckpointFile = new HashMap[Time, String] - // Mapping of the batch time to the time of the oldest checkpointed RDD - // in that batch's checkpoint data - @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] - - @transient private var fileSystem : FileSystem = null - protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] - - /** - * Updates the checkpoint data of the DStream. This gets called every time - * the graph checkpoint is initiated. Default implementation records the - * checkpoint files to which the generate RDDs of the DStream has been saved. - */ - def update(time: Time) { - - // Get the checkpointed RDDs from the generated RDDs - val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) - .map(x => (x._1, x._2.getCheckpointFile.get)) - logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - - // Add the checkpoint files to the data to be serialized - if (!checkpointFiles.isEmpty) { - currentCheckpointFiles.clear() - currentCheckpointFiles ++= checkpointFiles - // Add the current checkpoint files to the map of all checkpoint files - // This will be used to delete old checkpoint files - timeToCheckpointFile ++= currentCheckpointFiles - // Remember the time of the oldest checkpoint RDD in current state - timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) - } - } - - /** - * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been - * written to the checkpoint directory. - */ - def cleanup(time: Time) { - // Get the time of the oldest checkpointed RDD that was written as part of the - // checkpoint of `time` - timeToOldestCheckpointFileTime.remove(time) match { - case Some(lastCheckpointFileTime) => - // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime` - // This is because checkpointed RDDs older than this are not going to be needed - // even after master fails, as the checkpoint data of `time` does not refer to those files - val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) - logDebug("Files to delete:\n" + filesToDelete.mkString(",")) - filesToDelete.foreach { - case (time, file) => - try { - val path = new Path(file) - if (fileSystem == null) { - fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) - } - fileSystem.delete(path, true) - timeToCheckpointFile -= time - logInfo("Deleted checkpoint file '" + file + "' for time " + time) - } catch { - case e: Exception => - logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) - fileSystem = null - } - } - case None => - logInfo("Nothing to delete") - } - } - - /** - * Restore the checkpoint data. This gets called once when the DStream graph - * (along with its DStreams) are being restored from a graph checkpoint file. - * Default implementation restores the RDDs from their checkpoint files. - */ - def restore() { - // Create RDDs from the checkpoint data - currentCheckpointFiles.foreach { - case(time, file) => { - logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") - dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) - } - } - } - - override def toString() = { - "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" - } - - @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { - ois.defaultReadObject() - timeToOldestCheckpointFileTime = new HashMap[Time, Time] - timeToCheckpointFile = new HashMap[Time, String] - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 668e5324e6..31038a06b8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -17,11 +17,11 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.dstream.{NetworkInputDStream, InputDStream} +import scala.collection.mutable.ArrayBuffer import java.io.{ObjectInputStream, IOException, ObjectOutputStream} -import collection.mutable.ArrayBuffer import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream} final private[streaming] class DStreamGraph extends Serializable with Logging { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala deleted file mode 100644 index 56dbcbda23..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ /dev/null @@ -1,621 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream._ - -import org.apache.spark.{Partitioner, HashPartitioner} -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions} -import org.apache.spark.storage.StorageLevel - -import scala.collection.mutable.ArrayBuffer -import scala.reflect.{ClassTag, classTag} - -import org.apache.hadoop.mapred.{JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.mapred.OutputFormat -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.conf.Configuration - -class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) -extends Serializable { - - private[streaming] def ssc = self.ssc - - private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { - new HashPartitioner(numPartitions) - } - - /** - * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to - * generate the RDDs with Spark's default number of partitions. - */ - def groupByKey(): DStream[(K, Seq[V])] = { - groupByKey(defaultPartitioner()) - } - - /** - * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to - * generate the RDDs with `numPartitions` partitions. - */ - def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { - groupByKey(defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]] - * is used to control the partitioning of each RDD. - */ - def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { - val createCombiner = (v: V) => ArrayBuffer[V](v) - val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) - val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) - combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Seq[V])]] - } - - /** - * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the associative reduce function. Hash partitioning is used to generate the RDDs - * with Spark's default number of partitions. - */ - def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { - reduceByKey(reduceFunc, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs - * with `numPartitions` partitions. - */ - def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { - reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the - * partitioning of each RDD. - */ - def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { - val cleanedReduceFunc = ssc.sc.clean(reduceFunc) - combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) - } - - /** - * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. - */ - def combineByKey[C: ClassTag]( - createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiner: (C, C) => C, - partitioner: Partitioner, - mapSideCombine: Boolean = true): DStream[(K, C)] = { - new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) - } - - /** - * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to - * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs - * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with - * Spark's default number of partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - */ - def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = { - groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `groupByKey` over a sliding window. Similar to - * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to - * generate the RDDs with Spark's default number of partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { - groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * Similar to `DStream.groupByKey()`, but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions number of partitions of each RDD in the new DStream; if not specified - * then Spark's default number of partitions will be used - */ - def groupByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int - ): DStream[(K, Seq[V])] = { - groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) - } - - /** - * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * Similar to `DStream.groupByKey()`, but applies it over a sliding window. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. - */ - def groupByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration, - partitioner: Partitioner - ): DStream[(K, Seq[V])] = { - val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v - val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v - val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 - self.groupByKey(partitioner) - .window(windowDuration, slideDuration) - .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Seq[V])]] - } - - /** - * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream - * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate - * the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - windowDuration: Duration - ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to - * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to - * generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration - ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to - * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to - * generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions number of partitions of each RDD in the new DStream. - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int - ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to - * `DStream.reduceByKey()`, but applies it over a sliding window. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD - * in the new DStream. - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration, - partitioner: Partitioner - ): DStream[(K, V)] = { - val cleanedReduceFunc = ssc.sc.clean(reduceFunc) - self.reduceByKey(cleanedReduceFunc, partitioner) - .window(windowDuration, slideDuration) - .reduceByKey(cleanedReduceFunc, partitioner) - } - - /** - * Return a new DStream by applying incremental `reduceByKey` over a sliding window. - * The reduced value of over a new window is calculated using the old window's reduced value : - * 1. reduce the new values that entered the window (e.g., adding new counts) - * - * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * - * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. - * However, it is applicable to only "invertible reduce functions". - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function - * @param invReduceFunc inverse reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param filterFunc Optional function to filter expired key-value pairs; - * only pairs that satisfy the function are retained - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - invReduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration = self.slideDuration, - numPartitions: Int = ssc.sc.defaultParallelism, - filterFunc: ((K, V)) => Boolean = null - ): DStream[(K, V)] = { - - reduceByKeyAndWindow( - reduceFunc, invReduceFunc, windowDuration, - slideDuration, defaultPartitioner(numPartitions), filterFunc - ) - } - - /** - * Return a new DStream by applying incremental `reduceByKey` over a sliding window. - * The reduced value of over a new window is calculated using the old window's reduced value : - * 1. reduce the new values that entered the window (e.g., adding new counts) - * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. - * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function - * @param invReduceFunc inverse reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. - * @param filterFunc Optional function to filter expired key-value pairs; - * only pairs that satisfy the function are retained - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - invReduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration, - partitioner: Partitioner, - filterFunc: ((K, V)) => Boolean - ): DStream[(K, V)] = { - - val cleanedReduceFunc = ssc.sc.clean(reduceFunc) - val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) - val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None - new ReducedWindowedDStream[K, V]( - self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, - windowDuration, slideDuration, partitioner - ) - } - - /** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. - * @tparam S State type - */ - def updateStateByKey[S: ClassTag]( - updateFunc: (Seq[V], Option[S]) => Option[S] - ): DStream[(K, S)] = { - updateStateByKey(updateFunc, defaultPartitioner()) - } - - /** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. - * @param numPartitions Number of partitions of each RDD in the new DStream. - * @tparam S State type - */ - def updateStateByKey[S: ClassTag]( - updateFunc: (Seq[V], Option[S]) => Option[S], - numPartitions: Int - ): DStream[(K, S)] = { - updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) - } - - /** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. - * @tparam S State type - */ - def updateStateByKey[S: ClassTag]( - updateFunc: (Seq[V], Option[S]) => Option[S], - partitioner: Partitioner - ): DStream[(K, S)] = { - val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { - iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) - } - updateStateByKey(newUpdateFunc, partitioner, true) - } - - /** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. Note, that - * this function may generate a different a tuple with a different key - * than the input key. It is up to the developer to decide whether to - * remember the partitioner despite the key being changed. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. - * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. - * @tparam S State type - */ - def updateStateByKey[S: ClassTag]( - updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], - partitioner: Partitioner, - rememberPartitioner: Boolean - ): DStream[(K, S)] = { - new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) - } - - /** - * Return a new DStream by applying a map function to the value of each key-value pairs in - * 'this' DStream without changing the key. - */ - def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = { - new MapValuedDStream[K, V, U](self, mapValuesFunc) - } - - /** - * Return a new DStream by applying a flatmap function to the value of each key-value pairs in - * 'this' DStream without changing the key. - */ - def flatMapValues[U: ClassTag]( - flatMapValuesFunc: V => TraversableOnce[U] - ): DStream[(K, U)] = { - new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) - } - - /** - * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number - * of partitions. - */ - def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { - cogroup(other, defaultPartitioner()) - } - - /** - * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - */ - def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { - cogroup(other, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. - */ - def cogroup[W: ClassTag]( - other: DStream[(K, W)], - partitioner: Partitioner - ): DStream[(K, (Seq[V], Seq[W]))] = { - self.transformWith( - other, - (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) - ) - } - - /** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - */ - def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { - join[W](other, defaultPartitioner()) - } - - /** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - */ - def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { - join[W](other, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. - */ - def join[W: ClassTag]( - other: DStream[(K, W)], - partitioner: Partitioner - ): DStream[(K, (V, W))] = { - self.transformWith( - other, - (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) - ) - } - - /** - * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default - * number of partitions. - */ - def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { - leftOuterJoin[W](other, defaultPartitioner()) - } - - /** - * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - * partitions. - */ - def leftOuterJoin[W: ClassTag]( - other: DStream[(K, W)], - numPartitions: Int - ): DStream[(K, (V, Option[W]))] = { - leftOuterJoin[W](other, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control - * the partitioning of each RDD. - */ - def leftOuterJoin[W: ClassTag]( - other: DStream[(K, W)], - partitioner: Partitioner - ): DStream[(K, (V, Option[W]))] = { - self.transformWith( - other, - (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) - ) - } - - /** - * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default - * number of partitions. - */ - def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { - rightOuterJoin[W](other, defaultPartitioner()) - } - - /** - * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - * partitions. - */ - def rightOuterJoin[W: ClassTag]( - other: DStream[(K, W)], - numPartitions: Int - ): DStream[(K, (Option[V], W))] = { - rightOuterJoin[W](other, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control - * the partitioning of each RDD. - */ - def rightOuterJoin[W: ClassTag]( - other: DStream[(K, W)], - partitioner: Partitioner - ): DStream[(K, (Option[V], W))] = { - self.transformWith( - other, - (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) - ) - } - - /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval - * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" - */ - def saveAsHadoopFiles[F <: OutputFormat[K, V]]( - prefix: String, - suffix: String - )(implicit fm: ClassTag[F]) { - saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) - } - - /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval - * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" - */ - def saveAsHadoopFiles( - prefix: String, - suffix: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]], - conf: JobConf = new JobConf - ) { - val saveFunc = (rdd: RDD[(K, V)], time: Time) => { - val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) - } - self.foreach(saveFunc) - } - - /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is - * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". - */ - def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( - prefix: String, - suffix: String - )(implicit fm: ClassTag[F]) { - saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) - } - - /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is - * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". - */ - def saveAsNewAPIHadoopFiles( - prefix: String, - suffix: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = new Configuration - ) { - val saveFunc = (rdd: RDD[(K, V)], time: Time) => { - val file = rddToFileName(prefix, suffix, time) - rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) - } - self.foreach(saveFunc) - } - - private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass - - private def getValueClass() = implicitly[ClassTag[V]].runtimeClass -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index d29033df32..c92854ccd9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -17,13 +17,14 @@ package org.apache.spark.streaming.api.java -import org.apache.spark.streaming.{Duration, Time, DStream} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.api.java.JavaRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import scala.reflect.ClassTag +import org.apache.spark.streaming.dstream.DStream /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 64f38ce1c0..d3cd52ad7c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -30,6 +30,7 @@ import org.apache.spark.api.java.function.{Function3 => JFunction3, _} import java.util import org.apache.spark.rdd.RDD import JavaDStream._ +import org.apache.spark.streaming.dstream.DStream trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]] extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 6c3467d405..6bb985ca54 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -35,6 +35,7 @@ import org.apache.spark.storage.StorageLevel import com.google.common.base.Optional import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions +import org.apache.spark.streaming.dstream.DStream class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifest: ClassTag[K], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index ea7f7da6f3..03b422333f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -36,6 +36,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.dstream.DStream /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala new file mode 100644 index 0000000000..fd72ebc3d8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import scala.collection.mutable.HashMap +import scala.reflect.ClassTag + +import java.io.{ObjectInputStream, IOException, ObjectOutputStream} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.MetadataCleaner +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.streaming.Duration + + +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ + +abstract class DStream[T: ClassTag] ( + @transient private[streaming] var ssc: StreamingContext + ) extends Serializable with Logging { + + // ======================================================================= + // Methods that should be implemented by subclasses of DStream + // ======================================================================= + + /** Time interval after which the DStream generates a RDD */ + def slideDuration: Duration + + /** List of parent DStreams on which this DStream depends on */ + def dependencies: List[DStream[_]] + + /** Method that generates a RDD for the given time */ + def compute (validTime: Time): Option[RDD[T]] + + // ======================================================================= + // Methods and fields available on all DStreams + // ======================================================================= + + // RDDs generated, marked as private[streaming] so that testsuites can access it + @transient + private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () + + // Time zero for the DStream + private[streaming] var zeroTime: Time = null + + // Duration for which the DStream will remember each RDD created + private[streaming] var rememberDuration: Duration = null + + // Storage level of the RDDs in the stream + private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE + + // Checkpoint details + private[streaming] val mustCheckpoint = false + private[streaming] var checkpointDuration: Duration = null + private[streaming] val checkpointData = new DStreamCheckpointData(this) + + // Reference to whole DStream graph + private[streaming] var graph: DStreamGraph = null + + private[streaming] def isInitialized = (zeroTime != null) + + // Duration for which the DStream requires its parent DStream to remember each RDD created + private[streaming] def parentRememberDuration = rememberDuration + + /** Return the StreamingContext associated with this DStream */ + def context = ssc + + /** Persist the RDDs of this DStream with the given storage level */ + def persist(level: StorageLevel): DStream[T] = { + if (this.isInitialized) { + throw new UnsupportedOperationException( + "Cannot change storage level of an DStream after streaming context has started") + } + this.storageLevel = level + this + } + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): DStream[T] = persist() + + /** + * Enable periodic checkpointing of RDDs of this DStream + * @param interval Time interval after which generated RDD will be checkpointed + */ + def checkpoint(interval: Duration): DStream[T] = { + if (isInitialized) { + throw new UnsupportedOperationException( + "Cannot change checkpoint interval of an DStream after streaming context has started") + } + persist() + checkpointDuration = interval + this + } + + /** + * Initialize the DStream by setting the "zero" time, based on which + * the validity of future times is calculated. This method also recursively initializes + * its parent DStreams. + */ + private[streaming] def initialize(time: Time) { + if (zeroTime != null && zeroTime != time) { + throw new Exception("ZeroTime is already initialized to " + zeroTime + + ", cannot initialize it again to " + time) + } + zeroTime = time + + // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger + if (mustCheckpoint && checkpointDuration == null) { + checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt + logInfo("Checkpoint interval automatically set to " + checkpointDuration) + } + + // Set the minimum value of the rememberDuration if not already set + var minRememberDuration = slideDuration + if (checkpointDuration != null && minRememberDuration <= checkpointDuration) { + minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten + } + if (rememberDuration == null || rememberDuration < minRememberDuration) { + rememberDuration = minRememberDuration + } + + // Initialize the dependencies + dependencies.foreach(_.initialize(zeroTime)) + } + + private[streaming] def validate() { + assert(rememberDuration != null, "Remember duration is set to null") + + assert( + !mustCheckpoint || checkpointDuration != null, + "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." + + " Please use DStream.checkpoint() to set the interval." + ) + + assert( + checkpointDuration == null || context.sparkContext.checkpointDir.isDefined, + "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" + + " or SparkContext.checkpoint() to set the checkpoint directory." + ) + + assert( + checkpointDuration == null || checkpointDuration >= slideDuration, + "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + + checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " + + "Please set it to at least " + slideDuration + "." + ) + + assert( + checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration), + "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + + checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " + + "Please set it to a multiple " + slideDuration + "." + ) + + assert( + checkpointDuration == null || storageLevel != StorageLevel.NONE, + "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " + + "level has not been set to enable persisting. Please use DStream.persist() to set the " + + "storage level to use memory for better checkpointing performance." + ) + + assert( + checkpointDuration == null || rememberDuration > checkpointDuration, + "The remember duration for " + this.getClass.getSimpleName + " has been set to " + + rememberDuration + " which is not more than the checkpoint interval (" + + checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." + ) + + val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf) + logInfo("metadataCleanupDelay = " + metadataCleanerDelay) + assert( + metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, + "It seems you are doing some DStream window operation or setting a checkpoint interval " + + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + + "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + + "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + + "set the Java property 'spark.cleaner.delay' to more than " + + math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." + ) + + dependencies.foreach(_.validate()) + + logInfo("Slide time = " + slideDuration) + logInfo("Storage level = " + storageLevel) + logInfo("Checkpoint interval = " + checkpointDuration) + logInfo("Remember duration = " + rememberDuration) + logInfo("Initialized and validated " + this) + } + + private[streaming] def setContext(s: StreamingContext) { + if (ssc != null && ssc != s) { + throw new Exception("Context is already set in " + this + ", cannot set it again") + } + ssc = s + logInfo("Set context for " + this) + dependencies.foreach(_.setContext(ssc)) + } + + private[streaming] def setGraph(g: DStreamGraph) { + if (graph != null && graph != g) { + throw new Exception("Graph is already set in " + this + ", cannot set it again") + } + graph = g + dependencies.foreach(_.setGraph(graph)) + } + + private[streaming] def remember(duration: Duration) { + if (duration != null && duration > rememberDuration) { + rememberDuration = duration + logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) + } + dependencies.foreach(_.remember(parentRememberDuration)) + } + + /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ + private[streaming] def isTimeValid(time: Time): Boolean = { + if (!isInitialized) { + throw new Exception (this + " has not been initialized") + } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { + logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) + false + } else { + logDebug("Time " + time + " is valid") + true + } + } + + /** + * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal + * method that should not be called directly. + */ + private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { + // If this DStream was not initialized (i.e., zeroTime not set), then do it + // If RDD was already generated, then retrieve it from HashMap + generatedRDDs.get(time) match { + + // If an RDD was already generated and is being reused, then + // probably all RDDs in this DStream will be reused and hence should be cached + case Some(oldRDD) => Some(oldRDD) + + // if RDD was not generated, and if the time is valid + // (based on sliding time of this DStream), then generate the RDD + case None => { + if (isTimeValid(time)) { + compute(time) match { + case Some(newRDD) => + if (storageLevel != StorageLevel.NONE) { + newRDD.persist(storageLevel) + logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) + } + if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { + newRDD.checkpoint() + logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) + } + generatedRDDs.put(time, newRDD) + Some(newRDD) + case None => + None + } + } else { + None + } + } + } + } + + /** + * Generate a SparkStreaming job for the given time. This is an internal method that + * should not be called directly. This default implementation creates a job + * that materializes the corresponding RDD. Subclasses of DStream may override this + * to generate their own jobs. + */ + private[streaming] def generateJob(time: Time): Option[Job] = { + getOrCompute(time) match { + case Some(rdd) => { + val jobFunc = () => { + val emptyFunc = { (iterator: Iterator[T]) => {} } + context.sparkContext.runJob(rdd, emptyFunc) + } + Some(new Job(time, jobFunc)) + } + case None => None + } + } + + /** + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This default + * implementation clears the old generated RDDs. Subclasses of DStream may override + * this to clear their own metadata along with the generated RDDs. + */ + private[streaming] def clearMetadata(time: Time) { + val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) + generatedRDDs --= oldRDDs.keys + logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) + dependencies.foreach(_.clearMetadata(time)) + } + + /* Adds metadata to the Stream while it is running. + * This method should be overwritten by sublcasses of InputDStream. + */ + private[streaming] def addMetadata(metadata: Any) { + if (metadata != null) { + logInfo("Dropping Metadata: " + metadata.toString) + } + } + + /** + * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of + * this stream. This is an internal method that should not be called directly. This is + * a default implementation that saves only the file names of the checkpointed RDDs to + * checkpointData. Subclasses of DStream (especially those of InputDStream) may override + * this method to save custom checkpoint data. + */ + private[streaming] def updateCheckpointData(currentTime: Time) { + logInfo("Updating checkpoint data for time " + currentTime) + checkpointData.update(currentTime) + dependencies.foreach(_.updateCheckpointData(currentTime)) + logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) + } + + private[streaming] def clearCheckpointData(time: Time) { + logInfo("Clearing checkpoint data") + checkpointData.cleanup(time) + dependencies.foreach(_.clearCheckpointData(time)) + logInfo("Cleared checkpoint data") + } + + /** + * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method + * that should not be called directly. This is a default implementation that recreates RDDs + * from the checkpoint file names stored in checkpointData. Subclasses of DStream that + * override the updateCheckpointData() method would also need to override this method. + */ + private[streaming] def restoreCheckpointData() { + // Create RDDs from the checkpoint data + logInfo("Restoring checkpoint data") + checkpointData.restore() + dependencies.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") + } + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + logDebug(this.getClass().getSimpleName + ".writeObject used") + if (graph != null) { + graph.synchronized { + if (graph.checkpointInProgress) { + oos.defaultWriteObject() + } else { + val msg = "Object of " + this.getClass.getName + " is being serialized " + + " possibly as a part of closure of an RDD operation. This is because " + + " the DStream object is being referred to from within the closure. " + + " Please rewrite the RDD operation inside this DStream to avoid this. " + + " This has been enforced to avoid bloating of Spark tasks " + + " with unnecessary objects." + throw new java.io.NotSerializableException(msg) + } + } + } else { + throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.") + } + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") + ois.defaultReadObject() + generatedRDDs = new HashMap[Time, RDD[T]] () + } + + // ======================================================================= + // DStream operations + // ======================================================================= + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[U: ClassTag](mapFunc: T => U): DStream[U] = { + new MappedDStream(this, context.sparkContext.clean(mapFunc)) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = { + new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) + } + + /** Return a new DStream containing only the elements that satisfy a predicate. */ + def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) + + /** + * Return a new DStream in which each RDD is generated by applying glom() to each RDD of + * this DStream. Applying glom() to an RDD coalesces all elements within each partition into + * an array. + */ + def glom(): DStream[Array[T]] = new GlommedDStream(this) + + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions)) + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[U: ClassTag]( + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean = false + ): DStream[U] = { + new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing each RDD + * of this DStream. + */ + def reduce(reduceFunc: (T, T) => T): DStream[T] = + this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + + /** + * Return a new DStream in which each RDD has a single element generated by counting each RDD + * of this DStream. + */ + def count(): DStream[Long] = { + this.map(_ => (null, 1L)) + .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) + .reduceByKey(_ + _) + .map(_._2) + } + + /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + */ + def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] = + this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: RDD[T] => Unit) { + this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: (RDD[T], Time) => Unit) { + ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream. + */ + def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream. + */ + def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) + val cleanedF = context.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 1) + cleanedF(rdds.head.asInstanceOf[RDD[T]], time) + } + new TransformedDStream[U](Seq(this), realTransformFunc) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[U: ClassTag, V: ClassTag]( + other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[U: ClassTag, V: ClassTag]( + other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 2) + val rdd1 = rdds(0).asInstanceOf[RDD[T]] + val rdd2 = rdds(1).asInstanceOf[RDD[U]] + cleanedF(rdd1, rdd2, time) + } + new TransformedDStream[V](Seq(this, other), realTransformFunc) + } + + /** + * Print the first ten elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ + def print() { + def foreachFunc = (rdd: RDD[T], time: Time) => { + val first11 = rdd.take(11) + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + first11.take(10).foreach(println) + if (first11.size > 10) println("...") + println() + } + val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) + ssc.registerOutputStream(newStream) + } + + /** + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. The new DStream generates RDDs with + * the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + */ + def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) + + /** + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = { + new WindowedDStream(this, windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. However, the reduction is done incrementally + * using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + invReduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + this.map(x => (1, x)) + .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) + .map(_._2) + } + + /** + * Return a new DStream in which each RDD has a single element generated by counting the number + * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { + this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValueAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = ssc.sc.defaultParallelism + ): DStream[(T, Long)] = { + + this.map(x => (x, 1L)).reduceByKeyAndWindow( + (x: Long, y: Long) => x + y, + (x: Long, y: Long) => x - y, + windowDuration, + slideDuration, + numPartitions, + (x: (T, Long)) => x._2 != 0L + ) + } + + /** + * Return a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same slideDuration as this DStream. + */ + def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) + + /** + * Return all the RDDs defined by the Interval object (both end times included) + */ + def slice(interval: Interval): Seq[RDD[T]] = { + slice(interval.beginTime, interval.endTime) + } + + /** + * Return all the RDDs between 'fromTime' to 'toTime' (both included) + */ + def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { + if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + } + if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + } + val alignedToTime = toTime.floor(slideDuration) + val alignedFromTime = fromTime.floor(slideDuration) + + logInfo("Slicing from " + fromTime + " to " + toTime + + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") + + alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { + if (time >= zeroTime) getOrCompute(time) else None + }) + } + + /** + * Save each RDD in this DStream as a Sequence file of serialized objects. + * The file name at each batch interval is generated based on `prefix` and + * `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsObjectFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsObjectFile(file) + } + this.foreach(saveFunc) + } + + /** + * Save each RDD in this DStream as at text file, using string representation + * of elements. The file name at each batch interval is generated based on + * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsTextFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsTextFile(file) + } + this.foreach(saveFunc) + } + + def register() { + ssc.registerOutputStream(this) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala new file mode 100644 index 0000000000..2da4127f47 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import scala.collection.mutable.HashMap +import scala.reflect.ClassTag +import java.io.{ObjectInputStream, IOException} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.FileSystem +import org.apache.spark.Logging +import org.apache.spark.streaming.Time + +private[streaming] +class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) + extends Serializable with Logging { + protected val data = new HashMap[Time, AnyRef]() + + // Mapping of the batch time to the checkpointed RDD file of that time + @transient private var timeToCheckpointFile = new HashMap[Time, String] + // Mapping of the batch time to the time of the oldest checkpointed RDD + // in that batch's checkpoint data + @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] + + @transient private var fileSystem : FileSystem = null + protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] + + /** + * Updates the checkpoint data of the DStream. This gets called every time + * the graph checkpoint is initiated. Default implementation records the + * checkpoint files to which the generate RDDs of the DStream has been saved. + */ + def update(time: Time) { + + // Get the checkpointed RDDs from the generated RDDs + val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + .map(x => (x._1, x._2.getCheckpointFile.get)) + logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) + + // Add the checkpoint files to the data to be serialized + if (!checkpointFiles.isEmpty) { + currentCheckpointFiles.clear() + currentCheckpointFiles ++= checkpointFiles + // Add the current checkpoint files to the map of all checkpoint files + // This will be used to delete old checkpoint files + timeToCheckpointFile ++= currentCheckpointFiles + // Remember the time of the oldest checkpoint RDD in current state + timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) + } + } + + /** + * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been + * written to the checkpoint directory. + */ + def cleanup(time: Time) { + // Get the time of the oldest checkpointed RDD that was written as part of the + // checkpoint of `time` + timeToOldestCheckpointFileTime.remove(time) match { + case Some(lastCheckpointFileTime) => + // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime` + // This is because checkpointed RDDs older than this are not going to be needed + // even after master fails, as the checkpoint data of `time` does not refer to those files + val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) + logDebug("Files to delete:\n" + filesToDelete.mkString(",")) + filesToDelete.foreach { + case (time, file) => + try { + val path = new Path(file) + if (fileSystem == null) { + fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) + } + fileSystem.delete(path, true) + timeToCheckpointFile -= time + logInfo("Deleted checkpoint file '" + file + "' for time " + time) + } catch { + case e: Exception => + logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + fileSystem = null + } + } + case None => + logInfo("Nothing to delete") + } + } + + /** + * Restore the checkpoint data. This gets called once when the DStream graph + * (along with its DStreams) are being restored from a graph checkpoint file. + * Default implementation restores the RDDs from their checkpoint files. + */ + def restore() { + // Create RDDs from the checkpoint data + currentCheckpointFiles.foreach { + case(time, file) => { + logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") + dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) + } + } + } + + override def toString() = { + "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + ois.defaultReadObject() + timeToOldestCheckpointFileTime = new HashMap[Time, Time] + timeToCheckpointFile = new HashMap[Time, String] + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 1f0f31c4b1..012fbb0711 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD -import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} +import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.util.TimeStampedHashMap diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index db2e0a4cee..c81534ae58 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index 244dc3ee4f..6586234554 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index 336c4b7a92..c7bb2833ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 364abcde68..905bc723f6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.streaming.scheduler.Job import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index 23136f44fa..a9bb51f054 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 8f84232cab..a1075ad304 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream} +import org.apache.spark.streaming.{Time, Duration, StreamingContext} import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index 8a04060e5b..3d8ee29df1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 0ce364fd46..7aea1f945d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index c0b7491d09..02704a8d1c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala new file mode 100644 index 0000000000..f71dd17b2f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream._ + +import org.apache.spark.{Partitioner, HashPartitioner} +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions} +import org.apache.spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{ClassTag, classTag} + +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.{Time, Duration} + +class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) +extends Serializable { + + private[streaming] def ssc = self.ssc + + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { + new HashPartitioner(numPartitions) + } + + /** + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + */ + def groupByKey(): DStream[(K, Seq[V])] = { + groupByKey(defaultPartitioner()) + } + + /** + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + */ + def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { + groupByKey(defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]] + * is used to control the partitioning of each RDD. + */ + def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { + val createCombiner = (v: V) => ArrayBuffer[V](v) + val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) + val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) + combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) + .asInstanceOf[DStream[(K, Seq[V])]] + } + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. + */ + def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { + reduceByKey(reduceFunc, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. + */ + def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { + reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the + * partitioning of each RDD. + */ + def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) + } + + /** + * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in + * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + */ + def combineByKey[C: ClassTag]( + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + partitioner: Partitioner, + mapSideCombine: Boolean = true): DStream[(K, C)] = { + new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream; if not specified + * then Spark's default number of partitions will be used + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, Seq[V])] = { + val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v + val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v + val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 + self.groupByKey(partitioner) + .window(windowDuration, slideDuration) + .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) + .asInstanceOf[DStream[(K, Seq[V])]] + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD + * in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, V)] = { + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + self.reduceByKey(cleanedReduceFunc, partitioner) + .window(windowDuration, slideDuration) + .reduceByKey(cleanedReduceFunc, partitioner) + } + + /** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration = self.slideDuration, + numPartitions: Int = ssc.sc.defaultParallelism, + filterFunc: ((K, V)) => Boolean = null + ): DStream[(K, V)] = { + + reduceByKeyAndWindow( + reduceFunc, invReduceFunc, windowDuration, + slideDuration, defaultPartitioner(numPartitions), filterFunc + ) + } + + /** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner, + filterFunc: ((K, V)) => Boolean + ): DStream[(K, V)] = { + + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) + val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None + new ReducedWindowedDStream[K, V]( + self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, + windowDuration, slideDuration, partitioner + ) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ + def updateStateByKey[S: ClassTag]( + updateFunc: (Seq[V], Option[S]) => Option[S] + ): DStream[(K, S)] = { + updateStateByKey(updateFunc, defaultPartitioner()) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param numPartitions Number of partitions of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassTag]( + updateFunc: (Seq[V], Option[S]) => Option[S], + numPartitions: Int + ): DStream[(K, S)] = { + updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassTag]( + updateFunc: (Seq[V], Option[S]) => Option[S], + partitioner: Partitioner + ): DStream[(K, S)] = { + val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { + iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) + } + updateStateByKey(newUpdateFunc, partitioner, true) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. Note, that + * this function may generate a different a tuple with a different key + * than the input key. It is up to the developer to decide whether to + * remember the partitioner despite the key being changed. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. + * @tparam S State type + */ + def updateStateByKey[S: ClassTag]( + updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], + partitioner: Partitioner, + rememberPartitioner: Boolean + ): DStream[(K, S)] = { + new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) + } + + /** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ + def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = { + new MapValuedDStream[K, V, U](self, mapValuesFunc) + } + + /** + * Return a new DStream by applying a flatmap function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ + def flatMapValues[U: ClassTag]( + flatMapValuesFunc: V => TraversableOnce[U] + ): DStream[(K, U)] = { + new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number + * of partitions. + */ + def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. + */ + def cogroup[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Seq[V], Seq[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) + ) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + */ + def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { + join[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { + join[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + */ + def join[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, W))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) + ) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def leftOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def leftOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, Option[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) + ) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def rightOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def rightOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Option[V], W))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) + ) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ + def saveAsHadoopFiles[F <: OutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassTag[F]) { + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreach(saveFunc) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassTag[F]) { + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreach(saveFunc) + } + + private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass + + private def getValueClass() = implicitly[ClassTag[V]].runtimeClass +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index db56345ca8..7a6b1ea35e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -26,7 +26,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer -import org.apache.spark.streaming.{Duration, Interval, Time, DStream} +import org.apache.spark.streaming.{Duration, Interval, Time} import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index 84e69f277b..880a89bc36 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag private[streaming] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index e0ff3ccba4..cc583295a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.Partitioner import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Time, DStream} +import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index aeea060df7..7cd4554282 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag private[streaming] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 0d84ec84f2..4ecba03ab5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -17,9 +17,8 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD -import collection.mutable.ArrayBuffer import org.apache.spark.rdd.UnionRDD import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 162b19d7f0..e7403b5f1e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.util import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.ForEachDStream +import org.apache.spark.streaming.dstream.{DStream, ForEachDStream} import StreamingContext._ import scala.util.Random diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 2e3a1e66ad..d293d20644 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._ import util.ManualClock import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.streaming.dstream.DStream class BasicOperationsSuite extends TestSuiteBase { test("map") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 9590bca989..21a72e7cea 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -26,7 +26,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.FileInputDStream +import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import org.apache.spark.SparkConf diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9eb9b3684c..e0232c70a8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkException, SparkConf, SparkContext} import org.apache.spark.util.{Utils, MetadataCleaner} +import org.apache.spark.streaming.dstream.DStream class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index fa64142096..9e0f2c900e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.ArrayBuffer import org.scalatest.matchers.ShouldMatchers +import org.apache.spark.streaming.dstream.DStream class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 3569624d51..75093d6106 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream} +import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} import org.apache.spark.streaming.util.ManualClock import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index c39abfc21b..8f3c2dd86c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.DStream class WindowOperationsSuite extends TestSuiteBase { -- cgit v1.2.3 From 034f89aaab1db95e8908432f2445d6841526efcf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 12 Jan 2014 19:02:27 -0800 Subject: Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams. --- .../org/apache/spark/streaming/flume/FlumeUtils.scala | 1 + .../org/apache/spark/streaming/kafka/KafkaUtils.scala | 3 ++- .../org/apache/spark/streaming/mqtt/MQTTUtils.scala | 1 + .../apache/spark/streaming/twitter/TwitterUtils.scala | 4 ++++ .../scala/org/apache/spark/streaming/DStreamGraph.scala | 2 +- .../org/apache/spark/streaming/StreamingContext.scala | 5 +++-- .../spark/streaming/api/java/JavaStreamingContext.scala | 4 ++-- .../spark/streaming/dstream/WindowedDStream.scala | 17 +++++++++++++---- .../apache/spark/streaming/WindowOperationsSuite.scala | 14 ++++++++++++++ 9 files changed, 41 insertions(+), 10 deletions(-) (limited to 'streaming/src') diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index a01c17ac5d..a6af53e4a6 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -43,6 +43,7 @@ object FlumeUtils { /** * Creates a input stream from a Flume source. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent */ diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index df4ecac8d1..76f9c46657 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -78,6 +78,7 @@ object KafkaUtils { /** * Create an input stream that pulls messages form a Kafka Broker. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer @@ -128,7 +129,7 @@ object KafkaUtils { * see http://kafka.apache.org/08/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread - * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2. + * @param storageLevel RDD storage level. */ def createStream[K, V, U <: Decoder[_], T <: Decoder[_]]( jssc: JavaStreamingContext, diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index eacb26f6c5..caa86b27a0 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -44,6 +44,7 @@ object MQTTUtils { /** * Create an input stream that receives messages pushed by a MQTT publisher. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param brokerUrl Url of remote MQTT publisher * @param topic Topic name to subscribe to diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index 8ea52c4e5b..a23d685144 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -51,6 +51,7 @@ object TwitterUtils { * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and * twitter4j.oauth.accessTokenSecret. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object */ def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = { @@ -62,6 +63,7 @@ object TwitterUtils { * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and * twitter4j.oauth.accessTokenSecret. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param filters Set of filter strings to get only those tweets that match them */ @@ -88,6 +90,7 @@ object TwitterUtils { /** * Create a input stream that returns tweets received from Twitter. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param twitterAuth Twitter4J Authorization */ @@ -97,6 +100,7 @@ object TwitterUtils { /** * Create a input stream that returns tweets received from Twitter. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param twitterAuth Twitter4J Authorization * @param filters Set of filter strings to get only those tweets that match them diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 31038a06b8..8faa79f8c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -78,7 +78,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def remember(duration: Duration) { this.synchronized { if (rememberDuration != null) { - throw new Exception("Batch duration already set as " + batchDuration + + throw new Exception("Remember duration already set as " + batchDuration + ". cannot set it again.") } rememberDuration = duration diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ee83ae902b..7b27933403 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -168,7 +168,7 @@ class StreamingContext private[streaming] ( } /** - * Set the context to periodically checkpoint the DStream operations for master + * Set the context to periodically checkpoint the DStream operations for driver * fault-tolerance. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. * Note that this must be a fault-tolerant file system like HDFS for @@ -220,7 +220,7 @@ class StreamingContext private[streaming] ( def actorStream[T: ClassTag]( props: Props, name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy ): DStream[T] = { networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) @@ -272,6 +272,7 @@ class StreamingContext private[streaming] ( * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @tparam T Type of the objects in the received blocks */ def rawSocketStream[T: ClassTag]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index a9d605d55d..a2f0b88cb0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -151,7 +151,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel) : JavaDStream[String] = { @@ -161,7 +160,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create a input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited - * lines. + * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data */ @@ -302,6 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream with any arbitrary user implemented actor receiver. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param props Props object defining creation of the actor * @param name Name of the actor * diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 89c43ff935..6301772468 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -32,13 +32,14 @@ class WindowedDStream[T: ClassTag]( extends DStream[T](parent.ssc) { if (!_windowDuration.isMultipleOf(parent.slideDuration)) - throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " + + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") if (!_slideDuration.isMultipleOf(parent.slideDuration)) - throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " + + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + // Persist parent level by default, as those RDDs are going to be obviously reused. parent.persist(StorageLevel.MEMORY_ONLY_SER) def windowDuration: Duration = _windowDuration @@ -49,6 +50,14 @@ class WindowedDStream[T: ClassTag]( override def parentRememberDuration: Duration = rememberDuration + windowDuration + override def persist(level: StorageLevel): DStream[T] = { + // Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying + // RDDs and persisting the windowed RDDs would store numerous copies of the underlying data. + // Instead control the persistence of the parent DStream. + parent.persist(level) + this + } + override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index 8f3c2dd86c..471c99fab4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.storage.StorageLevel class WindowOperationsSuite extends TestSuiteBase { @@ -144,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase { Seconds(3) ) + test("window - persistence level") { + val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)) + val ssc = new StreamingContext(conf, batchDuration) + val inputStream = new TestInputStream[Int](ssc, input, 1) + val windowStream1 = inputStream.window(batchDuration * 2) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER) + windowStream1.persist(StorageLevel.MEMORY_ONLY) + assert(windowStream1.storageLevel === StorageLevel.NONE) + assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY) + ssc.stop() + } + // Testing naive reduceByKeyAndWindow (without invertible function) testReduceByKeyAndWindow( -- cgit v1.2.3