aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala89
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala5
8 files changed, 25 insertions, 106 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 4f7db41abe..22de8c02e6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -88,7 +88,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
// Initial ignore threshold based on which old, existing files in the directory (at the time of
// starting the streaming application) will be ignored or considered
- private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
+ private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.getTimeMillis() else 0L
/*
* Make sure that the information of files selected in the last few batches are remembered.
@@ -161,7 +161,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
*/
private def findNewFiles(currentTime: Long): Array[String] = {
try {
- lastNewFileFindingTime = clock.currentTime()
+ lastNewFileFindingTime = clock.getTimeMillis()
// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
@@ -174,7 +174,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
- val timeTaken = clock.currentTime() - lastNewFileFindingTime
+ val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 79263a7183..ee5e639b26 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -23,7 +23,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.SystemClock
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index f7a8ebee8a..dcdc27d29c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
-import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogFileSegment, WriteAheadLogManager}
-import org.apache.spark.util.Utils
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 8632c94349..ac92774a38 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -23,7 +23,8 @@ import akka.actor.{ActorRef, Props, Actor}
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
-import org.apache.spark.streaming.util.{Clock, ManualClock, RecurringTimer}
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, ManualClock}
/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
@@ -45,8 +46,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val clock = {
val clockClass = ssc.sc.conf.get(
- "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ "spark.streaming.clock", "org.apache.spark.util.SystemClock")
+ try {
+ Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ } catch {
+ case e: ClassNotFoundException if clockClass.startsWith("org.apache.spark.streaming") =>
+ val newClockClass = clockClass.replace("org.apache.spark.streaming", "org.apache.spark")
+ Class.forName(newClockClass).newInstance().asInstanceOf[Clock]
+ }
}
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index e19ac939f9..200cf4ef4b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, Logging, SparkConf}
import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
-import org.apache.spark.util.Utils
+import org.apache.spark.streaming.util.WriteAheadLogManager
+import org.apache.spark.util.{Clock, Utils}
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -150,7 +150,7 @@ private[streaming] class ReceivedBlockTracker(
* returns only after the files are cleaned up.
*/
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
- assert(cleanupThreshTime.milliseconds < clock.currentTime())
+ assert(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
writeToLog(BatchCleanupEvent(timesToCleanup))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
deleted file mode 100644
index d6d96d7ba0..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.util
-
-private[streaming]
-trait Clock {
- def currentTime(): Long
- def waitTillTime(targetTime: Long): Long
-}
-
-private[streaming]
-class SystemClock() extends Clock {
-
- val minPollTime = 25L
-
- def currentTime(): Long = {
- System.currentTimeMillis()
- }
-
- def waitTillTime(targetTime: Long): Long = {
- var currentTime = 0L
- currentTime = System.currentTimeMillis()
-
- var waitTime = targetTime - currentTime
- if (waitTime <= 0) {
- return currentTime
- }
-
- val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
-
- while (true) {
- currentTime = System.currentTimeMillis()
- waitTime = targetTime - currentTime
- if (waitTime <= 0) {
- return currentTime
- }
- val sleepTime = math.min(waitTime, pollTime)
- Thread.sleep(sleepTime)
- }
- -1
- }
-}
-
-private[streaming]
-class ManualClock() extends Clock {
-
- private var time = 0L
-
- def currentTime() = this.synchronized {
- time
- }
-
- def setTime(timeToSet: Long) = {
- this.synchronized {
- time = timeToSet
- this.notifyAll()
- }
- }
-
- def addToTime(timeToAdd: Long) = {
- this.synchronized {
- time += timeToAdd
- this.notifyAll()
- }
- }
- def waitTillTime(targetTime: Long): Long = {
- this.synchronized {
- while (time < targetTime) {
- this.wait(100)
- }
- }
- currentTime()
- }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index 1a616a0434..c8eef833eb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.util
import org.apache.spark.Logging
+import org.apache.spark.util.{Clock, SystemClock}
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
@@ -38,7 +39,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
* current system time.
*/
def getStartTime(): Long = {
- (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+ (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
}
/**
@@ -48,7 +49,7 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
* more than current time.
*/
def getRestartTime(originalStartTime: Long): Long = {
- val gap = clock.currentTime - originalStartTime
+ val gap = clock.getTimeMillis() - originalStartTime
(math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 166661b749..985ded9111 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -19,13 +19,12 @@ package org.apache.spark.streaming.util
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
import WriteAheadLogManager._
/**
@@ -82,7 +81,7 @@ private[streaming] class WriteAheadLogManager(
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
- fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+ fileSegment = getLogWriter(clock.getTimeMillis()).write(byteBuffer)
succeeded = true
} catch {
case ex: Exception =>