diff options
author | Ryan LeCompte <lecompte@gmail.com> | 2013-01-11 13:30:49 -0800 |
---|---|---|
committer | Ryan LeCompte <lecompte@gmail.com> | 2013-01-11 13:30:49 -0800 |
commit | 22445fbea9ed1575e49a1f9bb2251d98a57b9e4e (patch) | |
tree | bb1ba4d85d810bc024aee697e667fc05c6627ae0 /core | |
parent | f7cf035b9b8c6c84cc4f27c8f2334e99e417ce8a (diff) | |
download | spark-22445fbea9ed1575e49a1f9bb2251d98a57b9e4e.tar.gz spark-22445fbea9ed1575e49a1f9bb2251d98a57b9e4e.tar.bz2 spark-22445fbea9ed1575e49a1f9bb2251d98a57b9e4e.zip |
attempt to sleep for more accurate time period, minor cleanup
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/util/RateLimitedOutputStream.scala | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala index d11ed163ce..3050213709 100644 --- a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala +++ b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala @@ -1,8 +1,10 @@ package spark.util import java.io.OutputStream +import java.util.concurrent.TimeUnit._ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream { + val SyncIntervalNs = NANOSECONDS.convert(10, SECONDS) var lastSyncTime = System.nanoTime() var bytesWrittenSinceSync: Long = 0 @@ -28,20 +30,21 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu def waitToWrite(numBytes: Int) { while (true) { - val now = System.nanoTime() - val elapsed = math.max(now - lastSyncTime, 1) - val rate = bytesWrittenSinceSync.toDouble / (elapsed / 1.0e9) + val now = System.nanoTime + val elapsedSecs = SECONDS.convert(max(now - lastSyncTime, 1), NANOSECONDS) + val rate = bytesWrittenSinceSync.toDouble / elapsedSecs if (rate < bytesPerSec) { // It's okay to write; just update some variables and return bytesWrittenSinceSync += numBytes - if (now > lastSyncTime + (1e10).toLong) { - // Ten seconds have passed since lastSyncTime; let's resync + if (now > lastSyncTime + SyncIntervalNs) { + // Sync interval has passed; let's resync lastSyncTime = now bytesWrittenSinceSync = numBytes } - return } else { - Thread.sleep(5) + // Calculate how much time we should sleep to bring ourselves to the desired rate. + val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS) + if (sleepTime > 0) Thread.sleep(sleepTime) } } } @@ -53,4 +56,4 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu override def close() { out.close() } -}
\ No newline at end of file +} |