aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala10
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala2
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala25
3 files changed, 17 insertions, 20 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 0b80b611cd..588e86a188 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -18,9 +18,7 @@ package org.apache.spark.streaming.kinesis
import org.apache.spark.Logging
import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.util.Clock
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.streaming.util.SystemClock
+import org.apache.spark.util.{Clock, ManualClock, SystemClock}
/**
* This is a helper class for managing checkpoint clocks.
@@ -35,7 +33,7 @@ private[kinesis] class KinesisCheckpointState(
/* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */
val checkpointClock = new ManualClock()
- checkpointClock.setTime(currentClock.currentTime() + checkpointInterval.milliseconds)
+ checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds)
/**
* Check if it's time to checkpoint based on the current time and the derived time
@@ -44,13 +42,13 @@ private[kinesis] class KinesisCheckpointState(
* @return true if it's time to checkpoint
*/
def shouldCheckpoint(): Boolean = {
- new SystemClock().currentTime() > checkpointClock.currentTime()
+ new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis()
}
/**
* Advance the checkpoint clock by the checkpoint interval.
*/
def advanceCheckpoint() = {
- checkpointClock.addToTime(checkpointInterval.milliseconds)
+ checkpointClock.advance(checkpointInterval.milliseconds)
}
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 8ecc2d9016..af8cd875b4 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -104,7 +104,7 @@ private[kinesis] class KinesisRecordProcessor(
logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" +
s" records for shardId $shardId")
logDebug(s"Checkpoint: Next checkpoint is at " +
- s" ${checkpointState.checkpointClock.currentTime()} for shardId $shardId")
+ s" ${checkpointState.checkpointClock.getTimeMillis()} for shardId $shardId")
}
} catch {
case e: Throwable => {
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index f56898af02..255fe65819 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -25,8 +25,7 @@ import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.TestSuiteBase
-import org.apache.spark.streaming.util.Clock
-import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.util.{ManualClock, Clock}
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
@@ -129,45 +128,45 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
}
test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointIntervalMillis = 10
val checkpointState =
new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
- assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis)
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("should checkpoint if we have exceeded the checkpoint interval") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
assert(checkpointState.shouldCheckpoint())
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
assert(!checkpointState.shouldCheckpoint())
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("should add to time when advancing checkpoint") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointIntervalMillis = 10
val checkpointState =
new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
- assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis)
checkpointState.advanceCheckpoint()
- assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
+ assert(checkpointState.checkpointClock.getTimeMillis() == (2 * checkpointIntervalMillis))
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("shutdown should checkpoint if the reason is TERMINATE") {