aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala15
1 files changed, 8 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 40da313189..1a47089e51 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -133,17 +133,17 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
- val mergeValues = (seqOfValues: Seq[Seq[V]]) => {
- if (seqOfValues.size != 1 + numOldValues + numNewValues) {
+ val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
+ if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
throw new Exception("Unexpected number of sequences of reduced values")
}
// Getting reduced values "old time steps" that will be removed from current window
- val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
+ val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps"
val newValues =
- (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+ (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
- if (seqOfValues(0).isEmpty) {
+ if (arrayOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
throw new Exception("Neither previous window has value for key, nor new values found. " +
@@ -153,7 +153,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
newValues.reduce(reduceF) // return
} else {
// Get the previous window's reduced value
- var tempValue = seqOfValues(0).head
+ var tempValue = arrayOfValues(0).head
// If old values exists, then inverse reduce then from previous value
if (!oldValues.isEmpty) {
tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
@@ -166,7 +166,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
}
}
- val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
+ val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
+ .mapValues(mergeValues)
if (filterFunc.isDefined) {
Some(mergedValuesRDD.filter(filterFunc.get))