diff options
author | Sean Owen <sowen@cloudera.com> | 2015-02-19 15:35:23 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-02-19 15:35:31 -0800 |
commit | bd49e8b962b397b8fb8b22f980739021cf1a195e (patch) | |
tree | 1d3214d14afaf25bc5a1cc9093bd325eaaedfb28 /extras | |
parent | ff8976ec7825bec0f0c30bf763a8ad3dcc14542b (diff) | |
download | spark-bd49e8b962b397b8fb8b22f980739021cf1a195e.tar.gz spark-bd49e8b962b397b8fb8b22f980739021cf1a195e.tar.bz2 spark-bd49e8b962b397b8fb8b22f980739021cf1a195e.zip |
SPARK-4682 [CORE] Consolidate various 'Clock' classes
Another one from JoshRosen 's wish list. The first commit is much smaller and removes 2 of the 4 Clock classes. The second is much larger, necessary for consolidating the streaming one. I put together implementations in the way that seemed simplest. Almost all the change is standardizing class and method names.
Author: Sean Owen <sowen@cloudera.com>
Closes #4514 from srowen/SPARK-4682 and squashes the following commits:
5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark]
169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock class names
277785a [Sean Owen] Reduce the net change in this patch by reversing some unnecessary syntax changes along the way
b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis()
160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock
7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock
(cherry picked from commit 34b7c35380c88569a1396fb4ed991a0bed4288e7)
Signed-off-by: Andrew Or <andrew@databricks.com>
Diffstat (limited to 'extras')
3 files changed, 17 insertions, 20 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala index 0b80b611cd..588e86a188 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala @@ -18,9 +18,7 @@ package org.apache.spark.streaming.kinesis import org.apache.spark.Logging import org.apache.spark.streaming.Duration -import org.apache.spark.streaming.util.Clock -import org.apache.spark.streaming.util.ManualClock -import org.apache.spark.streaming.util.SystemClock +import org.apache.spark.util.{Clock, ManualClock, SystemClock} /** * This is a helper class for managing checkpoint clocks. @@ -35,7 +33,7 @@ private[kinesis] class KinesisCheckpointState( /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */ val checkpointClock = new ManualClock() - checkpointClock.setTime(currentClock.currentTime() + checkpointInterval.milliseconds) + checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds) /** * Check if it's time to checkpoint based on the current time and the derived time @@ -44,13 +42,13 @@ private[kinesis] class KinesisCheckpointState( * @return true if it's time to checkpoint */ def shouldCheckpoint(): Boolean = { - new SystemClock().currentTime() > checkpointClock.currentTime() + new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis() } /** * Advance the checkpoint clock by the checkpoint interval. */ def advanceCheckpoint() = { - checkpointClock.addToTime(checkpointInterval.milliseconds) + checkpointClock.advance(checkpointInterval.milliseconds) } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index 8ecc2d9016..af8cd875b4 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -104,7 +104,7 @@ private[kinesis] class KinesisRecordProcessor( logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" + s" records for shardId $shardId") logDebug(s"Checkpoint: Next checkpoint is at " + - s" ${checkpointState.checkpointClock.currentTime()} for shardId $shardId") + s" ${checkpointState.checkpointClock.getTimeMillis()} for shardId $shardId") } } catch { case e: Throwable => { diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index f56898af02..255fe65819 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -25,8 +25,7 @@ import org.apache.spark.streaming.Milliseconds import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.TestSuiteBase -import org.apache.spark.streaming.util.Clock -import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.{ManualClock, Clock} import org.mockito.Mockito._ import org.scalatest.BeforeAndAfter @@ -129,45 +128,45 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft } test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") { - when(currentClockMock.currentTime()).thenReturn(0) + when(currentClockMock.getTimeMillis()).thenReturn(0) val checkpointIntervalMillis = 10 val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock) - assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis) + assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis) - verify(currentClockMock, times(1)).currentTime() + verify(currentClockMock, times(1)).getTimeMillis() } test("should checkpoint if we have exceeded the checkpoint interval") { - when(currentClockMock.currentTime()).thenReturn(0) + when(currentClockMock.getTimeMillis()).thenReturn(0) val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock) assert(checkpointState.shouldCheckpoint()) - verify(currentClockMock, times(1)).currentTime() + verify(currentClockMock, times(1)).getTimeMillis() } test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { - when(currentClockMock.currentTime()).thenReturn(0) + when(currentClockMock.getTimeMillis()).thenReturn(0) val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock) assert(!checkpointState.shouldCheckpoint()) - verify(currentClockMock, times(1)).currentTime() + verify(currentClockMock, times(1)).getTimeMillis() } test("should add to time when advancing checkpoint") { - when(currentClockMock.currentTime()).thenReturn(0) + when(currentClockMock.getTimeMillis()).thenReturn(0) val checkpointIntervalMillis = 10 val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock) - assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis) + assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis) checkpointState.advanceCheckpoint() - assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis)) + assert(checkpointState.checkpointClock.getTimeMillis() == (2 * checkpointIntervalMillis)) - verify(currentClockMock, times(1)).currentTime() + verify(currentClockMock, times(1)).getTimeMillis() } test("shutdown should checkpoint if the reason is TERMINATE") { |