aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-08-06 14:52:46 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-08-06 14:52:46 -0700
commitcae894ee7aefa4cf9b1952038a48be81e1d2a856 (patch)
treedc866109eb423e7ebfc9be646749e46ed0f3a30a /streaming
parent43b81eb2719c4666b7869d7d0290f2ee83daeafa (diff)
downloadspark-cae894ee7aefa4cf9b1952038a48be81e1d2a856.tar.gz
spark-cae894ee7aefa4cf9b1952038a48be81e1d2a856.tar.bz2
spark-cae894ee7aefa4cf9b1952038a48be81e1d2a856.zip
Added new Clock interface that is used by RecurringTimer to scheduler events on system time or manually-configured time.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/Job.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/util/Clock.scala77
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala38
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuite.scala28
6 files changed, 130 insertions, 30 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index e19d2ecef5..c63c043415 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -197,7 +197,6 @@ extends Logging with Serializable {
private[streaming] def toQueue = {
val queue = new ArrayBlockingQueue[RDD[T]](10000)
this.foreachRDD(rdd => {
- println("Added RDD " + rdd.id)
queue.add(rdd)
})
queue
diff --git a/streaming/src/main/scala/spark/streaming/Job.scala b/streaming/src/main/scala/spark/streaming/Job.scala
index 2481a9a3ef..0bd8343b9a 100644
--- a/streaming/src/main/scala/spark/streaming/Job.scala
+++ b/streaming/src/main/scala/spark/streaming/Job.scala
@@ -1,5 +1,7 @@
package spark.streaming
+import java.util.concurrent.atomic.AtomicLong
+
class Job(val time: Time, func: () => _) {
val id = Job.getNewId()
def run(): Long = {
@@ -13,11 +15,8 @@ class Job(val time: Time, func: () => _) {
}
object Job {
- var lastId = 1
+ val id = new AtomicLong(0)
- def getNewId() = synchronized {
- lastId += 1
- lastId
- }
+ def getNewId() = id.getAndIncrement()
}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index fff4924b4c..309bd95525 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -1,6 +1,7 @@
package spark.streaming
import spark.streaming.util.RecurringTimer
+import spark.streaming.util.Clock
import spark.SparkEnv
import spark.Logging
@@ -20,8 +21,10 @@ extends Logging {
val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
- val timer = new RecurringTimer(ssc.batchDuration, generateRDDs(_))
-
+ val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
+ val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_))
+
def start() {
val zeroTime = Time(timer.start())
diff --git a/streaming/src/main/scala/spark/streaming/util/Clock.scala b/streaming/src/main/scala/spark/streaming/util/Clock.scala
new file mode 100644
index 0000000000..72e786e0c3
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/util/Clock.scala
@@ -0,0 +1,77 @@
+package spark.streaming.util
+
+import spark.streaming._
+
+trait Clock {
+ def currentTime(): Long
+ def waitTillTime(targetTime: Long): Long
+}
+
+
+class SystemClock() extends Clock {
+
+ val minPollTime = 25L
+
+ def currentTime(): Long = {
+ System.currentTimeMillis()
+ }
+
+ def waitTillTime(targetTime: Long): Long = {
+ var currentTime = 0L
+ currentTime = System.currentTimeMillis()
+
+ var waitTime = targetTime - currentTime
+ if (waitTime <= 0) {
+ return currentTime
+ }
+
+ val pollTime = {
+ if (waitTime / 10.0 > minPollTime) {
+ (waitTime / 10.0).toLong
+ } else {
+ minPollTime
+ }
+ }
+
+
+ while (true) {
+ currentTime = System.currentTimeMillis()
+ waitTime = targetTime - currentTime
+
+ if (waitTime <= 0) {
+
+ return currentTime
+ }
+ val sleepTime =
+ if (waitTime < pollTime) {
+ waitTime
+ } else {
+ pollTime
+ }
+ Thread.sleep(sleepTime)
+ }
+ return -1
+ }
+}
+
+class ManualClock() extends Clock {
+
+ var time = 0L
+
+ def currentTime() = time
+
+ def addToTime(timeToAdd: Long) = {
+ this.synchronized {
+ time += timeToAdd
+ this.notifyAll()
+ }
+ }
+ def waitTillTime(targetTime: Long): Long = {
+ this.synchronized {
+ while (time < targetTime) {
+ this.wait(100)
+ }
+ }
+ return currentTime()
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
index 6125bb82eb..5da9fa6ecc 100644
--- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
@@ -1,6 +1,6 @@
package spark.streaming.util
-class RecurringTimer(period: Long, callback: (Long) => Unit) {
+class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
val minPollTime = 25L
@@ -19,7 +19,7 @@ class RecurringTimer(period: Long, callback: (Long) => Unit) {
var nextTime = 0L
def start(): Long = {
- nextTime = (math.floor(System.currentTimeMillis() / period) + 1).toLong * period
+ nextTime = (math.floor(clock.currentTime / period) + 1).toLong * period
thread.start()
nextTime
}
@@ -31,22 +31,32 @@ class RecurringTimer(period: Long, callback: (Long) => Unit) {
def loop() {
try {
while (true) {
- val beforeSleepTime = System.currentTimeMillis()
- while (beforeSleepTime >= nextTime) {
- callback(nextTime)
- nextTime += period
- }
- val sleepTime = if (nextTime - beforeSleepTime < 2 * pollTime) {
- nextTime - beforeSleepTime
- } else {
- pollTime
- }
- Thread.sleep(sleepTime)
- val afterSleepTime = System.currentTimeMillis()
+ clock.waitTillTime(nextTime)
+ callback(nextTime)
+ nextTime += period
}
+
} catch {
case e: InterruptedException =>
}
}
}
+object RecurringTimer {
+
+ def main(args: Array[String]) {
+ var lastRecurTime = 0L
+ val period = 1000
+
+ def onRecur(time: Long) {
+ val currentTime = System.currentTimeMillis()
+ println("" + currentTime + ": " + (currentTime - lastRecurTime))
+ lastRecurTime = currentTime
+ }
+ val timer = new RecurringTimer(new SystemClock(), period, onRecur)
+ timer.start()
+ Thread.sleep(30 * 1000)
+ timer.stop()
+ }
+}
+
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
index ce7c3d2e2b..2c10a03e6d 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
@@ -1,6 +1,8 @@
package spark.streaming
-import spark.{Logging, RDD}
+import spark.Logging
+import spark.RDD
+import spark.streaming.util.ManualClock
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
@@ -13,11 +15,13 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
var ssc: SparkStreamContext = null
val batchDurationMillis = 1000
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
def testOp[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]]) {
- try {
+ try {
ssc = new SparkStreamContext("local", "test")
ssc.setBatchDuration(Milliseconds(batchDurationMillis))
@@ -26,12 +30,14 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
val outputQueue = outputStream.toQueue
ssc.start()
- Thread.sleep(batchDurationMillis * input.size)
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.addToTime(input.size * batchDurationMillis)
+
+ Thread.sleep(100)
val output = new ArrayBuffer[Seq[V]]()
while(outputQueue.size > 0) {
- val rdd = outputQueue.take()
- logInfo("Collecting RDD " + rdd.id + ", " + rdd.getClass.getSimpleName + ", " + rdd.splits.size)
+ val rdd = outputQueue.take()
output += (rdd.collect())
}
assert(output.size === expectedOutput.size)
@@ -58,8 +64,14 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
object DStreamSuite {
def main(args: Array[String]) {
- val r = new DStreamSuite()
- val inputData = Array(1 to 4, 5 to 8, 9 to 12)
- r.testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
+ try {
+ val r = new DStreamSuite()
+ val inputData = Array(1 to 4, 5 to 8, 9 to 12)
+ r.testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
+
+ } catch {
+ case e: Exception => e.printStackTrace()
+ }
+ System.exit(0)
}
} \ No newline at end of file