aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRyan LeCompte <lecompte@gmail.com>2013-01-13 10:01:56 -0800
committerRyan LeCompte <lecompte@gmail.com>2013-01-13 10:01:56 -0800
commit2305a2c1d91273a93ee6b571b0cd4bcaa1b2969d (patch)
tree31346436aed7eb47f3b2e54f11811db26b78db9f /core
parentaddff2c466d4b76043e612d4d28ab9de7f003298 (diff)
downloadspark-2305a2c1d91273a93ee6b571b0cd4bcaa1b2969d.tar.gz
spark-2305a2c1d91273a93ee6b571b0cd4bcaa1b2969d.tar.bz2
spark-2305a2c1d91273a93ee6b571b0cd4bcaa1b2969d.zip
more code cleanup
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/util/RateLimitedOutputStream.scala63
1 files changed, 32 insertions, 31 deletions
diff --git a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
index ed3d2b66bb..10790a9eee 100644
--- a/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
+++ b/core/src/main/scala/spark/util/RateLimitedOutputStream.scala
@@ -1,11 +1,14 @@
package spark.util
+import scala.annotation.tailrec
+
import java.io.OutputStream
import java.util.concurrent.TimeUnit._
class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
val SyncIntervalNs = NANOSECONDS.convert(10, SECONDS)
- var lastSyncTime = System.nanoTime()
+ val ChunkSize = 8192
+ var lastSyncTime = System.nanoTime
var bytesWrittenSinceSync: Long = 0
override def write(b: Int) {
@@ -17,37 +20,13 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
write(bytes, 0, bytes.length)
}
- override def write(bytes: Array[Byte], offset: Int, length: Int) {
- val CHUNK_SIZE = 8192
- var pos = 0
- while (pos < length) {
- val writeSize = math.min(length - pos, CHUNK_SIZE)
+ @tailrec
+ override final def write(bytes: Array[Byte], offset: Int, length: Int) {
+ val writeSize = math.min(length - offset, ChunkSize)
+ if (writeSize > 0) {
waitToWrite(writeSize)
- out.write(bytes, offset + pos, writeSize)
- pos += writeSize
- }
- }
-
- def waitToWrite(numBytes: Int) {
- while (true) {
- val now = System.nanoTime
- val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
- val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
- if (rate < bytesPerSec) {
- // It's okay to write; just update some variables and return
- bytesWrittenSinceSync += numBytes
- if (now > lastSyncTime + SyncIntervalNs) {
- // Sync interval has passed; let's resync
- lastSyncTime = now
- bytesWrittenSinceSync = numBytes
- }
- return
- } else {
- // Calculate how much time we should sleep to bring ourselves to the desired rate.
- // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
- val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
- if (sleepTime > 0) Thread.sleep(sleepTime)
- }
+ out.write(bytes, offset, writeSize)
+ write(bytes, offset + writeSize, length)
}
}
@@ -58,4 +37,26 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
override def close() {
out.close()
}
+
+ @tailrec
+ private def waitToWrite(numBytes: Int) {
+ val now = System.nanoTime
+ val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
+ val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
+ if (rate < bytesPerSec) {
+ // It's okay to write; just update some variables and return
+ bytesWrittenSinceSync += numBytes
+ if (now > lastSyncTime + SyncIntervalNs) {
+ // Sync interval has passed; let's resync
+ lastSyncTime = now
+ bytesWrittenSinceSync = numBytes
+ }
+ } else {
+ // Calculate how much time we should sleep to bring ourselves to the desired rate.
+ // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
+ val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
+ if (sleepTime > 0) Thread.sleep(sleepTime)
+ waitToWrite(numBytes)
+ }
+ }
}