aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Time.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala22
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala3
3 files changed, 22 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
index 42c49678d2..92cfd7d403 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
@@ -63,6 +63,11 @@ case class Time(private val millis: Long) {
new Time((this.millis / t) * t)
}
+ def floor(that: Duration, zeroTime: Time): Time = {
+ val t = that.milliseconds
+ new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
+ }
+
def isMultipleOf(that: Duration): Boolean =
(this.millis % that.milliseconds == 0)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index f1f8a70655..7092a3d3f0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -763,16 +763,22 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) {
throw new SparkException(this + " has not been initialized")
}
- if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
- logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
- + slideDuration + ")")
+
+ val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) {
+ toTime
+ } else {
+ logWarning("toTime (" + toTime + ") is not a multiple of slideDuration ("
+ + slideDuration + ")")
+ toTime.floor(slideDuration, zeroTime)
}
- if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
- logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
- + slideDuration + ")")
+
+ val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) {
+ fromTime
+ } else {
+ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ + slideDuration + ")")
+ fromTime.floor(slideDuration, zeroTime)
}
- val alignedToTime = toTime.floor(slideDuration)
- val alignedFromTime = fromTime.floor(slideDuration)
logInfo("Slicing from " + fromTime + " to " + toTime +
" (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala
index 5579ac3643..e6a01656f4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala
@@ -69,6 +69,9 @@ class TimeSuite extends TestSuiteBase {
assert(new Time(1200).floor(new Duration(200)) == new Time(1200))
assert(new Time(199).floor(new Duration(200)) == new Time(0))
assert(new Time(1).floor(new Duration(1)) == new Time(1))
+ assert(new Time(1350).floor(new Duration(200), new Time(50)) == new Time(1250))
+ assert(new Time(1350).floor(new Duration(200), new Time(150)) == new Time(1350))
+ assert(new Time(1350).floor(new Duration(200), new Time(200)) == new Time(1200))
}
test("isMultipleOf") {