diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-08-06 14:52:46 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-08-06 14:52:46 -0700 |
commit | cae894ee7aefa4cf9b1952038a48be81e1d2a856 (patch) | |
tree | dc866109eb423e7ebfc9be646749e46ed0f3a30a /streaming | |
parent | 43b81eb2719c4666b7869d7d0290f2ee83daeafa (diff) | |
download | spark-cae894ee7aefa4cf9b1952038a48be81e1d2a856.tar.gz spark-cae894ee7aefa4cf9b1952038a48be81e1d2a856.tar.bz2 spark-cae894ee7aefa4cf9b1952038a48be81e1d2a856.zip |
Added new Clock interface that is used by RecurringTimer to scheduler events on system time or manually-configured time.
Diffstat (limited to 'streaming')
6 files changed, 130 insertions, 30 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index e19d2ecef5..c63c043415 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -197,7 +197,6 @@ extends Logging with Serializable { private[streaming] def toQueue = { val queue = new ArrayBlockingQueue[RDD[T]](10000) this.foreachRDD(rdd => { - println("Added RDD " + rdd.id) queue.add(rdd) }) queue diff --git a/streaming/src/main/scala/spark/streaming/Job.scala b/streaming/src/main/scala/spark/streaming/Job.scala index 2481a9a3ef..0bd8343b9a 100644 --- a/streaming/src/main/scala/spark/streaming/Job.scala +++ b/streaming/src/main/scala/spark/streaming/Job.scala @@ -1,5 +1,7 @@ package spark.streaming +import java.util.concurrent.atomic.AtomicLong + class Job(val time: Time, func: () => _) { val id = Job.getNewId() def run(): Long = { @@ -13,11 +15,8 @@ class Job(val time: Time, func: () => _) { } object Job { - var lastId = 1 + val id = new AtomicLong(0) - def getNewId() = synchronized { - lastId += 1 - lastId - } + def getNewId() = id.getAndIncrement() } diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index fff4924b4c..309bd95525 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -1,6 +1,7 @@ package spark.streaming import spark.streaming.util.RecurringTimer +import spark.streaming.util.Clock import spark.SparkEnv import spark.Logging @@ -20,8 +21,10 @@ extends Logging { val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) - val timer = new RecurringTimer(ssc.batchDuration, generateRDDs(_)) - + val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") + val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] + val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_)) + def start() { val zeroTime = Time(timer.start()) diff --git a/streaming/src/main/scala/spark/streaming/util/Clock.scala b/streaming/src/main/scala/spark/streaming/util/Clock.scala new file mode 100644 index 0000000000..72e786e0c3 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/util/Clock.scala @@ -0,0 +1,77 @@ +package spark.streaming.util + +import spark.streaming._ + +trait Clock { + def currentTime(): Long + def waitTillTime(targetTime: Long): Long +} + + +class SystemClock() extends Clock { + + val minPollTime = 25L + + def currentTime(): Long = { + System.currentTimeMillis() + } + + def waitTillTime(targetTime: Long): Long = { + var currentTime = 0L + currentTime = System.currentTimeMillis() + + var waitTime = targetTime - currentTime + if (waitTime <= 0) { + return currentTime + } + + val pollTime = { + if (waitTime / 10.0 > minPollTime) { + (waitTime / 10.0).toLong + } else { + minPollTime + } + } + + + while (true) { + currentTime = System.currentTimeMillis() + waitTime = targetTime - currentTime + + if (waitTime <= 0) { + + return currentTime + } + val sleepTime = + if (waitTime < pollTime) { + waitTime + } else { + pollTime + } + Thread.sleep(sleepTime) + } + return -1 + } +} + +class ManualClock() extends Clock { + + var time = 0L + + def currentTime() = time + + def addToTime(timeToAdd: Long) = { + this.synchronized { + time += timeToAdd + this.notifyAll() + } + } + def waitTillTime(targetTime: Long): Long = { + this.synchronized { + while (time < targetTime) { + this.wait(100) + } + } + return currentTime() + } +} diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala index 6125bb82eb..5da9fa6ecc 100644 --- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala @@ -1,6 +1,6 @@ package spark.streaming.util -class RecurringTimer(period: Long, callback: (Long) => Unit) { +class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { val minPollTime = 25L @@ -19,7 +19,7 @@ class RecurringTimer(period: Long, callback: (Long) => Unit) { var nextTime = 0L def start(): Long = { - nextTime = (math.floor(System.currentTimeMillis() / period) + 1).toLong * period + nextTime = (math.floor(clock.currentTime / period) + 1).toLong * period thread.start() nextTime } @@ -31,22 +31,32 @@ class RecurringTimer(period: Long, callback: (Long) => Unit) { def loop() { try { while (true) { - val beforeSleepTime = System.currentTimeMillis() - while (beforeSleepTime >= nextTime) { - callback(nextTime) - nextTime += period - } - val sleepTime = if (nextTime - beforeSleepTime < 2 * pollTime) { - nextTime - beforeSleepTime - } else { - pollTime - } - Thread.sleep(sleepTime) - val afterSleepTime = System.currentTimeMillis() + clock.waitTillTime(nextTime) + callback(nextTime) + nextTime += period } + } catch { case e: InterruptedException => } } } +object RecurringTimer { + + def main(args: Array[String]) { + var lastRecurTime = 0L + val period = 1000 + + def onRecur(time: Long) { + val currentTime = System.currentTimeMillis() + println("" + currentTime + ": " + (currentTime - lastRecurTime)) + lastRecurTime = currentTime + } + val timer = new RecurringTimer(new SystemClock(), period, onRecur) + timer.start() + Thread.sleep(30 * 1000) + timer.stop() + } +} + diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala index ce7c3d2e2b..2c10a03e6d 100644 --- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala +++ b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala @@ -1,6 +1,8 @@ package spark.streaming -import spark.{Logging, RDD} +import spark.Logging +import spark.RDD +import spark.streaming.util.ManualClock import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter @@ -13,11 +15,13 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging { var ssc: SparkStreamContext = null val batchDurationMillis = 1000 + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + def testOp[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]]) { - try { + try { ssc = new SparkStreamContext("local", "test") ssc.setBatchDuration(Milliseconds(batchDurationMillis)) @@ -26,12 +30,14 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging { val outputQueue = outputStream.toQueue ssc.start() - Thread.sleep(batchDurationMillis * input.size) + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + clock.addToTime(input.size * batchDurationMillis) + + Thread.sleep(100) val output = new ArrayBuffer[Seq[V]]() while(outputQueue.size > 0) { - val rdd = outputQueue.take() - logInfo("Collecting RDD " + rdd.id + ", " + rdd.getClass.getSimpleName + ", " + rdd.splits.size) + val rdd = outputQueue.take() output += (rdd.collect()) } assert(output.size === expectedOutput.size) @@ -58,8 +64,14 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging { object DStreamSuite { def main(args: Array[String]) { - val r = new DStreamSuite() - val inputData = Array(1 to 4, 5 to 8, 9 to 12) - r.testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString))) + try { + val r = new DStreamSuite() + val inputData = Array(1 to 4, 5 to 8, 9 to 12) + r.testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString))) + + } catch { + case e: Exception => e.printStackTrace() + } + System.exit(0) } }
\ No newline at end of file |