aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-13 20:53:50 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-13 20:53:50 -0800
commit12b020b6689b8db94df904d9b897a43bce18c971 (patch)
tree4507472a54ec887c3888ade89c4bdbf652f2056c /streaming/src/main
parent39addd380363c0371e935fae50983fe87158c1ac (diff)
downloadspark-12b020b6689b8db94df904d9b897a43bce18c971.tar.gz
spark-12b020b6689b8db94df904d9b897a43bce18c971.tar.bz2
spark-12b020b6689b8db94df904d9b897a43bce18c971.zip
Added filter functionality to reduceByKeyAndWindow with inverse. Consolidated reduceByKeyAndWindow's many functions into smaller number of functions with optional parameters.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala71
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala30
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala1
5 files changed, 68 insertions, 64 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 0c1b667c0a..6abec9e6be 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -531,7 +531,7 @@ abstract class DStream[T: ClassManifest] (
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = {
- this.window(windowDuration, slideDuration).reduce(reduceFunc)
+ this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
def reduceByWindow(
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index fbcf061126..021ff83b36 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -137,7 +137,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream; if not specified
+ * then Spark's default number of partitions will be used
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -155,7 +156,7 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -213,7 +214,7 @@ extends Serializable {
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
+ reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
@@ -230,7 +231,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD
+ * in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
@@ -245,7 +247,7 @@ extends Serializable {
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -253,81 +255,64 @@ extends Serializable {
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- */
- def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
- invReduceFunc: (V, V) => V,
- windowDuration: Duration,
- slideDuration: Duration
- ): DStream[(K, V)] = {
-
- reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
- }
-
- /**
- * Create a new DStream by reducing over a using incremental computation.
- * The reduced value of over a new window is calculated using the old window's reduce value :
- * 1. reduce the new values that entered the window (e.g., adding new counts)
- * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
- * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
- * However, it is applicable to only "invertible reduce functions".
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
+ * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
- slideDuration: Duration,
- numPartitions: Int
+ slideDuration: Duration = self.slideDuration,
+ numPartitions: Int = ssc.sc.defaultParallelism,
+ filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)] = {
reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ reduceFunc, invReduceFunc, windowDuration,
+ slideDuration, defaultPartitioner(numPartitions), filterFunc
+ )
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
- * @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
- partitioner: Partitioner
+ partitioner: Partitioner,
+ filterFunc: ((K, V)) => Boolean
): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
+ val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
new ReducedWindowedDStream[K, V](
- self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner)
+ self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
+ windowDuration, slideDuration, partitioner
+ )
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ef10c091ca..4d3e0d0304 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -328,7 +328,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -342,25 +342,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
- numPartitions: Int
+ numPartitions: Int,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
- numPartitions)
+ numPartitions,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -374,20 +380,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
- partitioner: Partitioner
- ): JavaPairDStream[K, V] = {
+ partitioner: Partitioner,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
+ ): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
- partitioner)
+ partitioner,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index 733d5c4a25..aa5a71e1ed 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -3,7 +3,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.rdd.CoGroupedRDD
+import spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
@@ -15,7 +15,8 @@ private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
- invReduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ filterFunc: Option[((K, V)) => Boolean],
_windowDuration: Duration,
_slideDuration: Duration,
partitioner: Partitioner
@@ -87,22 +88,25 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
//
// Get the RDDs of the reduced values in "old time steps"
- val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
+ val oldRDDs =
+ reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
- val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
+ val newRDDs =
+ reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size)
// Get the RDD of the reduced value of the previous window
- val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+ val previousWindowRDD =
+ getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values
- val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
- //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+ val cogroupedRDD =
+ new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
@@ -114,7 +118,9 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
// Getting reduced values "old time steps" that will be removed from current window
val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps"
- val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+ val newValues =
+ (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+
if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
@@ -140,10 +146,12 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
- Some(mergedValuesRDD)
+ if (filterFunc.isDefined) {
+ Some(mergedValuesRDD.filter(filterFunc.get))
+ } else {
+ Some(mergedValuesRDD)
+ }
}
-
-
}
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index 3ffe4b64d0..83d8591a3a 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -291,7 +291,6 @@ class TestOutputStream[T: ClassManifest](
(rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
- println(t + ": " + collected.mkString("[", ",", "]"))
}
) {