aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/spark/streaming/DStream.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/spark/streaming/DStream.scala')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala230
1 files changed, 128 insertions, 102 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 352f83fe0c..9be7926a4a 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -12,7 +12,7 @@ import scala.collection.mutable.HashMap
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
/**
@@ -75,7 +75,7 @@ abstract class DStream[T: ClassManifest] (
// Checkpoint details
protected[streaming] val mustCheckpoint = false
protected[streaming] var checkpointDuration: Duration = null
- protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
+ protected[streaming] val checkpointData = new DStreamCheckpointData(this)
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
@@ -85,10 +85,10 @@ abstract class DStream[T: ClassManifest] (
// Duration for which the DStream requires its parent DStream to remember each RDD created
protected[streaming] def parentRememberDuration = rememberDuration
- /** Returns the StreamingContext associated with this DStream */
- def context() = ssc
+ /** Return the StreamingContext associated with this DStream */
+ def context = ssc
- /** Persists the RDDs of this DStream with the given storage level */
+ /** Persist the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
throw new UnsupportedOperationException(
@@ -132,7 +132,7 @@ abstract class DStream[T: ClassManifest] (
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
- checkpointDuration = slideDuration.max(Seconds(10))
+ checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
@@ -159,7 +159,7 @@ abstract class DStream[T: ClassManifest] (
)
assert(
- checkpointDuration == null || ssc.sc.checkpointDir.isDefined,
+ checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
"The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
" or SparkContext.checkpoint() to set the checkpoint directory."
)
@@ -238,13 +238,15 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.remember(parentRememberDuration))
}
- /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */
+ /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
protected def isTimeValid(time: Time): Boolean = {
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
+ logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
false
} else {
+ logInfo("Time " + time + " is valid")
true
}
}
@@ -292,14 +294,14 @@ abstract class DStream[T: ClassManifest] (
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
- * (eg. ForEachDStream).
+ * to generate their own jobs.
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
- val emptyFunc = { (iterator: Iterator[T]) => {} }
- ssc.sc.runJob(rdd, emptyFunc)
+ val emptyFunc = { (iterator: Iterator[T]) => {} }
+ context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
@@ -308,20 +310,18 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Dereference RDDs that are older than rememberDuration.
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This default
+ * implementation clears the old generated RDDs. Subclasses of DStream may override
+ * this to clear their own metadata along with the generated RDDs.
*/
- protected[streaming] def forgetOldRDDs(time: Time) {
- val keys = generatedRDDs.keys
+ protected[streaming] def clearOldMetadata(time: Time) {
var numForgotten = 0
- keys.foreach(t => {
- if (t <= (time - rememberDuration)) {
- generatedRDDs.remove(t)
- numForgotten += 1
- logInfo("Forgot RDD of time " + t + " from " + this)
- }
- })
- logInfo("Forgot " + numForgotten + " RDDs from " + this)
- dependencies.foreach(_.forgetOldRDDs(time))
+ val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
+ generatedRDDs --= oldRDDs.keys
+ logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+ dependencies.foreach(_.clearOldMetadata(time))
}
/* Adds metadata to the Stream while it is running.
@@ -342,40 +342,10 @@ abstract class DStream[T: ClassManifest] (
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
logInfo("Updating checkpoint data for time " + currentTime)
-
- // Get the checkpointed RDDs from the generated RDDs
- val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
- .map(x => (x._1, x._2.getCheckpointFile.get))
-
- // Make a copy of the existing checkpoint data (checkpointed RDDs)
- val oldRdds = checkpointData.rdds.clone()
-
- // If the new checkpoint data has checkpoints then replace existing with the new one
- if (newRdds.size > 0) {
- checkpointData.rdds.clear()
- checkpointData.rdds ++= newRdds
- }
-
- // Make parent DStreams update their checkpoint data
+ checkpointData.update()
dependencies.foreach(_.updateCheckpointData(currentTime))
-
- // TODO: remove this, this is just for debugging
- newRdds.foreach {
- case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
- }
-
- if (newRdds.size > 0) {
- (oldRdds -- newRdds.keySet).foreach {
- case (time, data) => {
- val path = new Path(data.toString)
- val fs = path.getFileSystem(new Configuration())
- fs.delete(path, true)
- logInfo("Deleted checkpoint file '" + path + "' for time " + time)
- }
- }
- }
- logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, "
- + "[" + checkpointData.rdds.mkString(",") + "]")
+ checkpointData.cleanup()
+ logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}
/**
@@ -386,14 +356,8 @@ abstract class DStream[T: ClassManifest] (
*/
protected[streaming] def restoreCheckpointData() {
// Create RDDs from the checkpoint data
- logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs")
- checkpointData.rdds.foreach {
- case(time, data) => {
- logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
- val rdd = ssc.sc.checkpointFile[T](data.toString)
- generatedRDDs += ((time, rdd))
- }
- }
+ logInfo("Restoring checkpoint data")
+ checkpointData.restore()
dependencies.foreach(_.restoreCheckpointData())
logInfo("Restored checkpoint data")
}
@@ -433,7 +397,7 @@ abstract class DStream[T: ClassManifest] (
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
- new MappedDStream(this, ssc.sc.clean(mapFunc))
+ new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
/**
@@ -441,7 +405,7 @@ abstract class DStream[T: ClassManifest] (
* and then flattening the results
*/
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
- new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
+ new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
/** Return a new DStream containing only the elements that satisfy a predicate. */
@@ -463,7 +427,7 @@ abstract class DStream[T: ClassManifest] (
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
): DStream[U] = {
- new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning)
+ new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
}
/**
@@ -477,14 +441,28 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
+ def count(): DStream[Long] = {
+ this.map(_ => (null, 1L))
+ .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
+ .reduceByKey(_ + _)
+ .map(_._2)
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ */
+ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
+ this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: RDD[T] => Unit) {
- foreach((r: RDD[T], t: Time) => foreachFunc(r))
+ this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
}
/**
@@ -492,7 +470,7 @@ abstract class DStream[T: ClassManifest] (
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
ssc.registerOutputStream(newStream)
newStream
}
@@ -510,7 +488,7 @@ abstract class DStream[T: ClassManifest] (
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
- new TransformedDStream(this, ssc.sc.clean(transformFunc))
+ new TransformedDStream(this, context.sparkContext.clean(transformFunc))
}
/**
@@ -527,19 +505,21 @@ abstract class DStream[T: ClassManifest] (
if (first11.size > 10) println("...")
println()
}
- val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
ssc.registerOutputStream(newStream)
}
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
- * The new DStream generates RDDs with the same interval as this DStream.
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream. The new DStream generates RDDs with
+ * the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
*/
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
/**
- * Return a new DStream which is computed based on windowed batches of this DStream.
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -551,27 +531,39 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Return a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchTime, batchTime).
- * @param batchDuration tumbling window duration; must be a multiple of this DStream's
- * batching interval
- */
- def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration)
-
- /**
* Return a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowDuration and slideDuration are as defined
- * in the window() operation. This is equivalent to
- * window(windowDuration, slideDuration).reduce(reduceFunc)
+ * elements in a sliding window over this DStream.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = {
- this.window(windowDuration, slideDuration).reduce(reduceFunc)
+ this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream. However, the reduction is done incrementally
+ * using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
@@ -585,14 +577,47 @@ abstract class DStream[T: ClassManifest] (
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
- * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
+ * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
/**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValueAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int = ssc.sc.defaultParallelism
+ ): DStream[(T, Long)] = {
+
+ this.map(x => (x, 1L)).reduceByKeyAndWindow(
+ (x: Long, y: Long) => x + y,
+ (x: Long, y: Long) => x - y,
+ windowDuration,
+ slideDuration,
+ numPartitions,
+ (x: (T, Long)) => x._2 != 0L
+ )
+ }
+
+ /**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
@@ -609,16 +634,21 @@ abstract class DStream[T: ClassManifest] (
* Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
- val rdds = new ArrayBuffer[RDD[T]]()
- var time = toTime.floor(slideDuration)
- while (time >= zeroTime && time >= fromTime) {
- getOrCompute(time) match {
- case Some(rdd) => rdds += rdd
- case None => //throw new Exception("Could not get RDD for time " + time)
- }
- time -= slideDuration
+ if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ }
+ if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
}
- rdds.toSeq
+ val alignedToTime = toTime.floor(slideDuration)
+ val alignedFromTime = fromTime.floor(slideDuration)
+
+ logInfo("Slicing from " + fromTime + " to " + toTime +
+ " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
+
+ alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+ if (time >= zeroTime) getOrCompute(time) else None
+ })
}
/**
@@ -651,7 +681,3 @@ abstract class DStream[T: ClassManifest] (
ssc.registerOutputStream(this)
}
}
-
-private[streaming]
-case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-