aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-13 20:53:50 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-13 20:53:50 -0800
commit12b020b6689b8db94df904d9b897a43bce18c971 (patch)
tree4507472a54ec887c3888ade89c4bdbf652f2056c
parent39addd380363c0371e935fae50983fe87158c1ac (diff)
downloadspark-12b020b6689b8db94df904d9b897a43bce18c971.tar.gz
spark-12b020b6689b8db94df904d9b897a43bce18c971.tar.bz2
spark-12b020b6689b8db94df904d9b897a43bce18c971.zip
Added filter functionality to reduceByKeyAndWindow with inverse. Consolidated reduceByKeyAndWindow's many functions into smaller number of functions with optional parameters.
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala71
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala30
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala1
-rw-r--r--streaming/src/test/resources/log4j.properties2
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala49
7 files changed, 102 insertions, 81 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 0c1b667c0a..6abec9e6be 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -531,7 +531,7 @@ abstract class DStream[T: ClassManifest] (
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = {
- this.window(windowDuration, slideDuration).reduce(reduceFunc)
+ this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
def reduceByWindow(
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index fbcf061126..021ff83b36 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -137,7 +137,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream; if not specified
+ * then Spark's default number of partitions will be used
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -155,7 +156,7 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -213,7 +214,7 @@ extends Serializable {
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
+ reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
@@ -230,7 +231,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD
+ * in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
@@ -245,7 +247,7 @@ extends Serializable {
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -253,81 +255,64 @@ extends Serializable {
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
- * @param windowDuration width of the window; must be a multiple of this DStream's
- * batching interval
- * @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's batching interval
- */
- def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
- invReduceFunc: (V, V) => V,
- windowDuration: Duration,
- slideDuration: Duration
- ): DStream[(K, V)] = {
-
- reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
- }
-
- /**
- * Create a new DStream by reducing over a using incremental computation.
- * The reduced value of over a new window is calculated using the old window's reduce value :
- * 1. reduce the new values that entered the window (e.g., adding new counts)
- * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
- * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
- * However, it is applicable to only "invertible reduce functions".
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
+ * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
- slideDuration: Duration,
- numPartitions: Int
+ slideDuration: Duration = self.slideDuration,
+ numPartitions: Int = ssc.sc.defaultParallelism,
+ filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)] = {
reduceByKeyAndWindow(
- reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ reduceFunc, invReduceFunc, windowDuration,
+ slideDuration, defaultPartitioner(numPartitions), filterFunc
+ )
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
- * @param reduceFunc associative reduce function
- * @param invReduceFunc inverse function
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
- partitioner: Partitioner
+ partitioner: Partitioner,
+ filterFunc: ((K, V)) => Boolean
): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
+ val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
new ReducedWindowedDStream[K, V](
- self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner)
+ self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
+ windowDuration, slideDuration, partitioner
+ )
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ef10c091ca..4d3e0d0304 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -328,7 +328,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -342,25 +342,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
- numPartitions: Int
+ numPartitions: Int,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
- numPartitions)
+ numPartitions,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
}
/**
- * Create a new DStream by reducing over a using incremental computation.
+ * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -374,20 +380,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ * set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
- partitioner: Partitioner
- ): JavaPairDStream[K, V] = {
+ partitioner: Partitioner,
+ filterFunc: JFunction[(K, V), java.lang.Boolean]
+ ): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
- partitioner)
+ partitioner,
+ (p: (K, V)) => filterFunc(p).booleanValue()
+ )
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index 733d5c4a25..aa5a71e1ed 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -3,7 +3,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.rdd.CoGroupedRDD
+import spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
@@ -15,7 +15,8 @@ private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
- invReduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ filterFunc: Option[((K, V)) => Boolean],
_windowDuration: Duration,
_slideDuration: Duration,
partitioner: Partitioner
@@ -87,22 +88,25 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
//
// Get the RDDs of the reduced values in "old time steps"
- val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
+ val oldRDDs =
+ reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
- val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
+ val newRDDs =
+ reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size)
// Get the RDD of the reduced value of the previous window
- val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+ val previousWindowRDD =
+ getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values
- val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
- //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+ val cogroupedRDD =
+ new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
@@ -114,7 +118,9 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
// Getting reduced values "old time steps" that will be removed from current window
val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps"
- val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+ val newValues =
+ (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+
if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
@@ -140,10 +146,12 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
- Some(mergedValuesRDD)
+ if (filterFunc.isDefined) {
+ Some(mergedValuesRDD.filter(filterFunc.get))
+ } else {
+ Some(mergedValuesRDD)
+ }
}
-
-
}
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index 3ffe4b64d0..83d8591a3a 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -291,7 +291,6 @@ class TestOutputStream[T: ClassManifest](
(rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
- println(t + ": " + collected.mkString("[", ",", "]"))
}
) {
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 5652596e1e..f0638e0e02 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,7 +1,7 @@
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=WARN, file
# log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file=org.apache.log4j.ConsoleAppender
+log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 1080790147..e6ac7b35aa 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -84,12 +84,9 @@ class WindowOperationsSuite extends TestSuiteBase {
)
/*
- The output of the reduceByKeyAndWindow with inverse reduce function is
- different from the naive reduceByKeyAndWindow. Even if the count of a
- particular key is 0, the key does not get eliminated from the RDDs of
- ReducedWindowedDStream. This causes the number of keys in these RDDs to
- increase forever. A more generalized version that allows elimination of
- keys should be considered.
+ The output of the reduceByKeyAndWindow with inverse function but without a filter
+ function will be different from the naive reduceByKeyAndWindow, as no keys get
+ eliminated from the ReducedWindowedDStream even if the value of a key becomes 0.
*/
val bigReduceInvOutput = Seq(
@@ -177,31 +174,31 @@ class WindowOperationsSuite extends TestSuiteBase {
// Testing reduceByKeyAndWindow (with invertible reduce function)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"basic reduction",
Seq(Seq(("a", 1), ("a", 3)) ),
Seq(Seq(("a", 4)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"key already in window and new value added into window",
Seq( Seq(("a", 1)), Seq(("a", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"new key added into window",
Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"key removed from window",
Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"larger slide time",
largerSlideInput,
largerSlideReduceOutput,
@@ -209,7 +206,9 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(2)
)
- testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput)
+ testReduceByKeyAndWindowWithInverse("big test", bigInput, bigReduceInvOutput)
+
+ testReduceByKeyAndWindowWithFilteredInverse("big test", bigInput, bigReduceOutput)
test("groupByKeyAndWindow") {
val input = bigInput
@@ -276,27 +275,45 @@ class WindowOperationsSuite extends TestSuiteBase {
test("reduceByKeyAndWindow - " + name) {
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist()
+ s.reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowDuration, slideDuration)
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
- def testReduceByKeyAndWindowInv(
+ def testReduceByKeyAndWindowWithInverse(
name: String,
input: Seq[Seq[(String, Int)]],
expectedOutput: Seq[Seq[(String, Int)]],
windowDuration: Duration = Seconds(2),
slideDuration: Duration = Seconds(1)
) {
- test("reduceByKeyAndWindowInv - " + name) {
+ test("ReduceByKeyAndWindow with inverse function - " + name) {
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration)
- .persist()
.checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
+
+ def testReduceByKeyAndWindowWithFilteredInverse(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowDuration: Duration = Seconds(2),
+ slideDuration: Duration = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindow with inverse and filter functions - " + name) {
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
+ val filterFunc = (p: (String, Int)) => p._2 != 0
+ val operation = (s: DStream[(String, Int)]) => {
+ s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration, filterFunc = filterFunc)
+ .persist()
+ .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
+ }
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+ }
}