diff options
author | Ryan LeCompte <lecompte@gmail.com> | 2013-01-13 10:01:56 -0800 |
---|---|---|
committer | Ryan LeCompte <lecompte@gmail.com> | 2013-01-13 10:01:56 -0800 |
commit | 2305a2c1d91273a93ee6b571b0cd4bcaa1b2969d (patch) | |
tree | 31346436aed7eb47f3b2e54f11811db26b78db9f /core | |
parent | addff2c466d4b76043e612d4d28ab9de7f003298 (diff) | |
download | spark-2305a2c1d91273a93ee6b571b0cd4bcaa1b2969d.tar.gz spark-2305a2c1d91273a93ee6b571b0cd4bcaa1b2969d.tar.bz2 spark-2305a2c1d91273a93ee6b571b0cd4bcaa1b2969d.zip |
more code cleanup
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/util/RateLimitedOutputStream.scala | 63 |
1 files changed, 32 insertions, 31 deletions
diff --git a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala index ed3d2b66bb..10790a9eee 100644 --- a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala +++ b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala @@ -1,11 +1,14 @@ package spark.util +import scala.annotation.tailrec + import java.io.OutputStream import java.util.concurrent.TimeUnit._ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream { val SyncIntervalNs = NANOSECONDS.convert(10, SECONDS) - var lastSyncTime = System.nanoTime() + val ChunkSize = 8192 + var lastSyncTime = System.nanoTime var bytesWrittenSinceSync: Long = 0 override def write(b: Int) { @@ -17,37 +20,13 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu write(bytes, 0, bytes.length) } - override def write(bytes: Array[Byte], offset: Int, length: Int) { - val CHUNK_SIZE = 8192 - var pos = 0 - while (pos < length) { - val writeSize = math.min(length - pos, CHUNK_SIZE) + @tailrec + override final def write(bytes: Array[Byte], offset: Int, length: Int) { + val writeSize = math.min(length - offset, ChunkSize) + if (writeSize > 0) { waitToWrite(writeSize) - out.write(bytes, offset + pos, writeSize) - pos += writeSize - } - } - - def waitToWrite(numBytes: Int) { - while (true) { - val now = System.nanoTime - val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS) - val rate = bytesWrittenSinceSync.toDouble / elapsedSecs - if (rate < bytesPerSec) { - // It's okay to write; just update some variables and return - bytesWrittenSinceSync += numBytes - if (now > lastSyncTime + SyncIntervalNs) { - // Sync interval has passed; let's resync - lastSyncTime = now - bytesWrittenSinceSync = numBytes - } - return - } else { - // Calculate how much time we should sleep to bring ourselves to the desired rate. - // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) - val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS) - if (sleepTime > 0) Thread.sleep(sleepTime) - } + out.write(bytes, offset, writeSize) + write(bytes, offset + writeSize, length) } } @@ -58,4 +37,26 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu override def close() { out.close() } + + @tailrec + private def waitToWrite(numBytes: Int) { + val now = System.nanoTime + val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS) + val rate = bytesWrittenSinceSync.toDouble / elapsedSecs + if (rate < bytesPerSec) { + // It's okay to write; just update some variables and return + bytesWrittenSinceSync += numBytes + if (now > lastSyncTime + SyncIntervalNs) { + // Sync interval has passed; let's resync + lastSyncTime = now + bytesWrittenSinceSync = numBytes + } + } else { + // Calculate how much time we should sleep to bring ourselves to the desired rate. + // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) + val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS) + if (sleepTime > 0) Thread.sleep(sleepTime) + waitToWrite(numBytes) + } + } } |