aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-02-19 15:35:23 -0800
committerAndrew Or <andrew@databricks.com>2015-02-19 15:35:23 -0800
commit34b7c35380c88569a1396fb4ed991a0bed4288e7 (patch)
treedeb25f1dd88477aff0dc19904cbdc9aedc0cc7d8 /extras
parentad6b169dee84df175b51933b7a3ad7f0bbc52cf3 (diff)
downloadspark-34b7c35380c88569a1396fb4ed991a0bed4288e7.tar.gz
spark-34b7c35380c88569a1396fb4ed991a0bed4288e7.tar.bz2
spark-34b7c35380c88569a1396fb4ed991a0bed4288e7.zip
SPARK-4682 [CORE] Consolidate various 'Clock' classes
Another one from JoshRosen 's wish list. The first commit is much smaller and removes 2 of the 4 Clock classes. The second is much larger, necessary for consolidating the streaming one. I put together implementations in the way that seemed simplest. Almost all the change is standardizing class and method names. Author: Sean Owen <sowen@cloudera.com> Closes #4514 from srowen/SPARK-4682 and squashes the following commits: 5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark] 169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock class names 277785a [Sean Owen] Reduce the net change in this patch by reversing some unnecessary syntax changes along the way b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis() 160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock 7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala10
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala2
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala25
3 files changed, 17 insertions, 20 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 0b80b611cd..588e86a188 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -18,9 +18,7 @@ package org.apache.spark.streaming.kinesis
import org.apache.spark.Logging
import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.util.Clock
-import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.streaming.util.SystemClock
+import org.apache.spark.util.{Clock, ManualClock, SystemClock}
/**
* This is a helper class for managing checkpoint clocks.
@@ -35,7 +33,7 @@ private[kinesis] class KinesisCheckpointState(
/* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */
val checkpointClock = new ManualClock()
- checkpointClock.setTime(currentClock.currentTime() + checkpointInterval.milliseconds)
+ checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds)
/**
* Check if it's time to checkpoint based on the current time and the derived time
@@ -44,13 +42,13 @@ private[kinesis] class KinesisCheckpointState(
* @return true if it's time to checkpoint
*/
def shouldCheckpoint(): Boolean = {
- new SystemClock().currentTime() > checkpointClock.currentTime()
+ new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis()
}
/**
* Advance the checkpoint clock by the checkpoint interval.
*/
def advanceCheckpoint() = {
- checkpointClock.addToTime(checkpointInterval.milliseconds)
+ checkpointClock.advance(checkpointInterval.milliseconds)
}
}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 8ecc2d9016..af8cd875b4 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -104,7 +104,7 @@ private[kinesis] class KinesisRecordProcessor(
logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" +
s" records for shardId $shardId")
logDebug(s"Checkpoint: Next checkpoint is at " +
- s" ${checkpointState.checkpointClock.currentTime()} for shardId $shardId")
+ s" ${checkpointState.checkpointClock.getTimeMillis()} for shardId $shardId")
}
} catch {
case e: Throwable => {
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index f56898af02..255fe65819 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -25,8 +25,7 @@ import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.TestSuiteBase
-import org.apache.spark.streaming.util.Clock
-import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.util.{ManualClock, Clock}
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
@@ -129,45 +128,45 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
}
test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointIntervalMillis = 10
val checkpointState =
new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
- assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis)
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("should checkpoint if we have exceeded the checkpoint interval") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
assert(checkpointState.shouldCheckpoint())
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
assert(!checkpointState.shouldCheckpoint())
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("should add to time when advancing checkpoint") {
- when(currentClockMock.currentTime()).thenReturn(0)
+ when(currentClockMock.getTimeMillis()).thenReturn(0)
val checkpointIntervalMillis = 10
val checkpointState =
new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
- assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ assert(checkpointState.checkpointClock.getTimeMillis() == checkpointIntervalMillis)
checkpointState.advanceCheckpoint()
- assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
+ assert(checkpointState.checkpointClock.getTimeMillis() == (2 * checkpointIntervalMillis))
- verify(currentClockMock, times(1)).currentTime()
+ verify(currentClockMock, times(1)).getTimeMillis()
}
test("shutdown should checkpoint if the reason is TERMINATE") {