diff options
author | Wesley Miao <wesley.miao@gmail.com> | 2015-05-11 12:20:06 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-05-11 12:20:06 +0100 |
commit | d70a076892e0677acceccaba665908cdf664f1b4 (patch) | |
tree | 4360a67173bf5322920fc5aee05521f218994a3d /streaming/src/test | |
parent | 2242ab31e99227a102b0918d73db67e99899fd24 (diff) | |
download | spark-d70a076892e0677acceccaba665908cdf664f1b4.tar.gz spark-d70a076892e0677acceccaba665908cdf664f1b4.tar.bz2 spark-d70a076892e0677acceccaba665908cdf664f1b4.zip |
[SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
tdas
https://issues.apache.org/jira/browse/SPARK-7326
The problem most likely resides in DStream.slice() implementation, as shown below.
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!isInitialized) {
throw new SparkException(this + " has not been initialized")
}
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
}
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
}
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
logInfo("Slicing from " + fromTime + " to " + toTime +
" (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
if (time >= zeroTime) getOrCompute(time) else None
})
}
Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.
The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :
def floor(that: Duration, zeroTime: Time): Time = {
val t = that.milliseconds
new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
}
And then change the DStream.slice to call this new floor function by passing in its zeroTime.
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.
Author: Wesley Miao <wesley.miao@gmail.com>
Author: Wesley <wesley.miao@autodesk.com>
Closes #5871 from wesleymiao/spark-7326 and squashes the following commits:
82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't work all the time
48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala | 3 |
1 files changed, 3 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala index 5579ac3643..e6a01656f4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala @@ -69,6 +69,9 @@ class TimeSuite extends TestSuiteBase { assert(new Time(1200).floor(new Duration(200)) == new Time(1200)) assert(new Time(199).floor(new Duration(200)) == new Time(0)) assert(new Time(1).floor(new Duration(1)) == new Time(1)) + assert(new Time(1350).floor(new Duration(200), new Time(50)) == new Time(1250)) + assert(new Time(1350).floor(new Duration(200), new Time(150)) == new Time(1350)) + assert(new Time(1350).floor(new Duration(200), new Time(200)) == new Time(1200)) } test("isMultipleOf") { |