aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRyan LeCompte <lecompte@gmail.com>2013-01-11 13:30:49 -0800
committerRyan LeCompte <lecompte@gmail.com>2013-01-11 13:30:49 -0800
commit22445fbea9ed1575e49a1f9bb2251d98a57b9e4e (patch)
treebb1ba4d85d810bc024aee697e667fc05c6627ae0 /core
parentf7cf035b9b8c6c84cc4f27c8f2334e99e417ce8a (diff)
downloadspark-22445fbea9ed1575e49a1f9bb2251d98a57b9e4e.tar.gz
spark-22445fbea9ed1575e49a1f9bb2251d98a57b9e4e.tar.bz2
spark-22445fbea9ed1575e49a1f9bb2251d98a57b9e4e.zip
attempt to sleep for more accurate time period, minor cleanup
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/util/RateLimitedOutputStream.scala19
1 files changed, 11 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
index d11ed163ce..3050213709 100644
--- a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
+++ b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
@@ -1,8 +1,10 @@
package spark.util
import java.io.OutputStream
+import java.util.concurrent.TimeUnit._
class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
+ val SyncIntervalNs = NANOSECONDS.convert(10, SECONDS)
var lastSyncTime = System.nanoTime()
var bytesWrittenSinceSync: Long = 0
@@ -28,20 +30,21 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
def waitToWrite(numBytes: Int) {
while (true) {
- val now = System.nanoTime()
- val elapsed = math.max(now - lastSyncTime, 1)
- val rate = bytesWrittenSinceSync.toDouble / (elapsed / 1.0e9)
+ val now = System.nanoTime
+ val elapsedSecs = SECONDS.convert(max(now - lastSyncTime, 1), NANOSECONDS)
+ val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
if (rate < bytesPerSec) {
// It's okay to write; just update some variables and return
bytesWrittenSinceSync += numBytes
- if (now > lastSyncTime + (1e10).toLong) {
- // Ten seconds have passed since lastSyncTime; let's resync
+ if (now > lastSyncTime + SyncIntervalNs) {
+ // Sync interval has passed; let's resync
lastSyncTime = now
bytesWrittenSinceSync = numBytes
}
- return
} else {
- Thread.sleep(5)
+ // Calculate how much time we should sleep to bring ourselves to the desired rate.
+ val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
+ if (sleepTime > 0) Thread.sleep(sleepTime)
}
}
}
@@ -53,4 +56,4 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
override def close() {
out.close()
}
-} \ No newline at end of file
+}