From 33022d6656530ffd272ed447af543473fb8de5e9 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 13 Jan 2014 19:58:53 -0800 Subject: Adjusted visibility of various components. --- .../main/scala/org/apache/spark/Accumulators.scala | 4 +- .../main/scala/org/apache/spark/FutureAction.scala | 8 +-- .../org/apache/spark/InterruptibleIterator.scala | 2 +- core/src/main/scala/org/apache/spark/Logging.scala | 2 +- .../org/apache/spark/broadcast/Broadcast.scala | 1 + .../apache/spark/broadcast/BroadcastFactory.scala | 2 +- .../org/apache/spark/broadcast/HttpBroadcast.scala | 5 +- .../apache/spark/broadcast/TorrentBroadcast.scala | 6 +- .../scala/org/apache/spark/deploy/Client.scala | 3 +- .../apache/spark/deploy/worker/CommandUtils.scala | 3 +- core/src/main/scala/org/apache/spark/package.scala | 3 + .../main/scala/org/apache/spark/rdd/PipedRDD.scala | 3 +- .../org/apache/spark/scheduler/SparkListener.scala | 13 ++-- .../apache/spark/storage/BlockObjectWriter.scala | 4 +- .../org/apache/spark/storage/StorageLevel.scala | 4 ++ .../org/apache/spark/util/CompletionIterator.scala | 11 +-- .../org/apache/spark/util/MetadataCleaner.scala | 8 +-- .../spark/util/RateLimitedOutputStream.scala | 79 ---------------------- .../spark/util/RateLimitedOutputStreamSuite.scala | 40 ----------- project/SparkBuild.scala | 7 ++ .../streaming/util/RateLimitedOutputStream.scala | 79 ++++++++++++++++++++++ .../spark/streaming/util/RawTextSender.scala | 13 ++-- .../util/RateLimitedOutputStreamSuite.scala | 40 +++++++++++ 23 files changed, 183 insertions(+), 157 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala delete mode 100644 core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index e89ac28b8e..2ba871a600 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -24,7 +24,7 @@ import scala.collection.generic.Growable import org.apache.spark.serializer.JavaSerializer /** - * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation, + * A datatype that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. * * You must define how to add data, and how to merge two of these together. For some datatypes, @@ -185,7 +185,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser } /** - * A simpler value of [[org.apache.spark.Accumulable]] where the result type being accumulated is the same + * A simpler value of [[Accumulable]] where the result type being accumulated is the same * as the types of elements being merged. * * @param initialValue initial value of accumulator diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index c6b4ac5192..d7d10285da 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -27,8 +27,8 @@ import org.apache.spark.rdd.RDD /** - * A future for the result of an action. This is an extension of the Scala Future interface to - * support cancellation. + * A future for the result of an action to support cancellation. This is an extension of the + * Scala Future interface to support cancellation. */ trait FutureAction[T] extends Future[T] { // Note that we redefine methods of the Future trait here explicitly so we can specify a different @@ -86,7 +86,7 @@ trait FutureAction[T] extends Future[T] { /** - * The future holding the result of an action that triggers a single job. Examples include + * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. */ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) @@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: /** - * A FutureAction for actions that could trigger multiple Spark jobs. Examples include take, + * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take, * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala index 56e0b8d2c0..9b1601d5b9 100644 --- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala +++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala @@ -19,7 +19,7 @@ package org.apache.spark /** * An iterator that wraps around an existing iterator to provide task killing functionality. - * It works by checking the interrupted flag in TaskContext. + * It works by checking the interrupted flag in [[TaskContext]]. */ class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) extends Iterator[T] { diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index 9063cae87e..b749e5414d 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -122,7 +122,7 @@ trait Logging { } } -object Logging { +private object Logging { @volatile private var initialized = false val initLock = new Object() } diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index 0fc478a419..6bfe2cb4a2 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.spark._ +private[spark] abstract class Broadcast[T](private[spark] val id: Long) extends Serializable { def value: T diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index fb161ce69d..940e5ab805 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkConf * BroadcastFactory implementation to instantiate a particular broadcast for the * entire Spark job. */ -private[spark] trait BroadcastFactory { +trait BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf): Unit def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T] def stop(): Unit diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 0eacda3d7d..39ee0dbb92 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -63,7 +63,10 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea } } -private[spark] class HttpBroadcastFactory extends BroadcastFactory { +/** + * A [[BroadcastFactory]] implementation that uses a HTTP server as the broadcast medium. + */ +class HttpBroadcastFactory extends BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) } def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) = diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 1d295c62bc..d351dfc1f5 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -236,8 +236,10 @@ private[spark] case class TorrentInfo( @transient var hasBlocks = 0 } -private[spark] class TorrentBroadcastFactory - extends BroadcastFactory { +/** + * A [[BroadcastFactory]] that creates a torrent-based implementation of broadcast. + */ +class TorrentBroadcastFactory extends BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index e133893f6c..9987e2300c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -29,13 +29,12 @@ import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.util.{AkkaUtils, Utils} -import akka.actor.Actor.emptyBehavior import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} /** * Proxy that relays messages to the driver. */ -class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging { +private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging { var masterActor: ActorSelection = _ val timeout = AkkaUtils.askTimeout(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 7507bf8ad0..cf6a23339d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -10,8 +10,9 @@ import org.apache.spark.util.Utils /** ** Utilities for running commands with the spark classpath. */ +private[spark] object CommandUtils extends Logging { - private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = { + def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = { val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java") // SPARK-698: do not call the run.cmd script, as process.destroy() diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 70a5a8caff..2625a7f6a5 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -29,6 +29,9 @@ package org.apache * be saved as SequenceFiles. These operations are automatically available on any RDD of the right * type (e.g. RDD[(Int, Int)] through implicit conversions when you * `import org.apache.spark.SparkContext._`. + * + * Java programmers should reference the [[spark.api.java]] package + * for Spark programming APIs in Java. */ package object spark { // For package docs only diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index d4f396afb5..8ef919c4b5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -27,7 +27,6 @@ import scala.io.Source import scala.reflect.ClassTag import org.apache.spark.{SparkEnv, Partition, TaskContext} -import org.apache.spark.broadcast.Broadcast /** @@ -113,7 +112,7 @@ class PipedRDD[T: ClassTag]( } } -object PipedRDD { +private object PipedRDD { // Split a string into words using a standard StringTokenizer def tokenize(command: String): Seq[String] = { val buf = new ArrayBuffer[String] diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 55a40a92c9..d8e97c3b7c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.util.Properties import org.apache.spark.util.{Utils, Distribution} -import org.apache.spark.{Logging, SparkContext, TaskEndReason} +import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.executor.TaskMetrics sealed trait SparkListenerEvents @@ -27,7 +27,7 @@ sealed trait SparkListenerEvents case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties) extends SparkListenerEvents -case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents +case class SparkListenerStageCompleted(stage: StageInfo) extends SparkListenerEvents case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents @@ -46,6 +46,9 @@ case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents +/** + * Interface for listening to events from the Spark scheduler. + */ trait SparkListener { /** * Called when a stage is completed, with information on the completed stage @@ -115,7 +118,7 @@ class StatsReportListener extends SparkListener with Logging { } -object StatsReportListener extends Logging { +private[spark] object StatsReportListener extends Logging { //for profiling, the extremes are more interesting val percentiles = Array[Int](0,5,10,25,50,75,90,95,100) @@ -202,9 +205,9 @@ object StatsReportListener extends Logging { } } +private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) -case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) -object RuntimePercentage { +private object RuntimePercentage { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { val denom = totalTime.toDouble val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 369a277232..48cec4be41 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer} * * This interface does not support concurrent writes. */ -abstract class BlockObjectWriter(val blockId: BlockId) { +private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { def open(): BlockObjectWriter @@ -69,7 +69,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) { } /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ -class DiskBlockObjectWriter( +private[spark] class DiskBlockObjectWriter( blockId: BlockId, file: File, serializer: Serializer, diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 0f84810d6b..1b7934d59f 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -108,6 +108,10 @@ class StorageLevel private( } +/** + * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating + * new storage levels. + */ object StorageLevel { val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala index dc15a38b29..fcc1ca9502 100644 --- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala @@ -18,14 +18,15 @@ package org.apache.spark.util /** - * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements + * Wrapper around an iterator which calls a completion method after it successfully iterates + * through all the elements. */ -abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ - def next = sub.next +private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ + def next() = sub.next() def hasNext = { val r = sub.hasNext if (!r) { - completion + completion() } r } @@ -33,7 +34,7 @@ abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterato def completion() } -object CompletionIterator { +private[spark] object CompletionIterator { def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = { new CompletionIterator[A,I](sub) { def completion() = completionFunction diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index ac07a55cb9..b0febe906a 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -18,13 +18,13 @@ package org.apache.spark.util import java.util.{TimerTask, Timer} -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{SparkConf, Logging} /** * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries) */ -class MetadataCleaner( +private[spark] class MetadataCleaner( cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit, conf: SparkConf) @@ -60,7 +60,7 @@ class MetadataCleaner( } } -object MetadataCleanerType extends Enumeration { +private[spark] object MetadataCleanerType extends Enumeration { val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value @@ -72,7 +72,7 @@ object MetadataCleanerType extends Enumeration { // TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the // initialization of StreamingContext. It's okay for users trying to configure stuff themselves. -object MetadataCleaner { +private[spark] object MetadataCleaner { def getDelaySeconds(conf: SparkConf) = { conf.getInt("spark.cleaner.ttl", -1) } diff --git a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala deleted file mode 100644 index 47e1b45004..0000000000 --- a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import scala.annotation.tailrec - -import java.io.OutputStream -import java.util.concurrent.TimeUnit._ - -class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream { - val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS) - val CHUNK_SIZE = 8192 - var lastSyncTime = System.nanoTime - var bytesWrittenSinceSync: Long = 0 - - override def write(b: Int) { - waitToWrite(1) - out.write(b) - } - - override def write(bytes: Array[Byte]) { - write(bytes, 0, bytes.length) - } - - @tailrec - override final def write(bytes: Array[Byte], offset: Int, length: Int) { - val writeSize = math.min(length - offset, CHUNK_SIZE) - if (writeSize > 0) { - waitToWrite(writeSize) - out.write(bytes, offset, writeSize) - write(bytes, offset + writeSize, length) - } - } - - override def flush() { - out.flush() - } - - override def close() { - out.close() - } - - @tailrec - private def waitToWrite(numBytes: Int) { - val now = System.nanoTime - val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS) - val rate = bytesWrittenSinceSync.toDouble / elapsedSecs - if (rate < bytesPerSec) { - // It's okay to write; just update some variables and return - bytesWrittenSinceSync += numBytes - if (now > lastSyncTime + SYNC_INTERVAL) { - // Sync interval has passed; let's resync - lastSyncTime = now - bytesWrittenSinceSync = numBytes - } - } else { - // Calculate how much time we should sleep to bring ourselves to the desired rate. - // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) - val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS) - if (sleepTime > 0) Thread.sleep(sleepTime) - waitToWrite(numBytes) - } - } -} diff --git a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala deleted file mode 100644 index a9dd0b1a5b..0000000000 --- a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import org.scalatest.FunSuite -import java.io.ByteArrayOutputStream -import java.util.concurrent.TimeUnit._ - -class RateLimitedOutputStreamSuite extends FunSuite { - - private def benchmark[U](f: => U): Long = { - val start = System.nanoTime - f - System.nanoTime - start - } - - test("write") { - val underlying = new ByteArrayOutputStream - val data = "X" * 41000 - val stream = new RateLimitedOutputStream(underlying, 10000) - val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) } - assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4) - assert(underlying.toString("UTF-8") == data) - } -} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c8b5f09ab5..d4e06dd2a1 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -136,6 +136,13 @@ object SparkBuild extends Build { javaOptions += "-Xmx3g", // Show full stack trace and duration in test cases. testOptions in Test += Tests.Argument("-oDF"), + // Remove certain packages from Scaladoc + scalacOptions in (Compile,doc) := Seq("-skip-packages", Seq( + "akka", + "org.apache.spark.network", + "org.apache.spark.deploy", + "org.apache.spark.util.collection" + ).mkString(":")), // Only allow one test at a time, even across projects, since they run in the same JVM concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala new file mode 100644 index 0000000000..b9c0596378 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import scala.annotation.tailrec + +import java.io.OutputStream +import java.util.concurrent.TimeUnit._ + +class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream { + val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS) + val CHUNK_SIZE = 8192 + var lastSyncTime = System.nanoTime + var bytesWrittenSinceSync: Long = 0 + + override def write(b: Int) { + waitToWrite(1) + out.write(b) + } + + override def write(bytes: Array[Byte]) { + write(bytes, 0, bytes.length) + } + + @tailrec + override final def write(bytes: Array[Byte], offset: Int, length: Int) { + val writeSize = math.min(length - offset, CHUNK_SIZE) + if (writeSize > 0) { + waitToWrite(writeSize) + out.write(bytes, offset, writeSize) + write(bytes, offset + writeSize, length) + } + } + + override def flush() { + out.flush() + } + + override def close() { + out.close() + } + + @tailrec + private def waitToWrite(numBytes: Int) { + val now = System.nanoTime + val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS) + val rate = bytesWrittenSinceSync.toDouble / elapsedSecs + if (rate < bytesPerSec) { + // It's okay to write; just update some variables and return + bytesWrittenSinceSync += numBytes + if (now > lastSyncTime + SYNC_INTERVAL) { + // Sync interval has passed; let's resync + lastSyncTime = now + bytesWrittenSinceSync = numBytes + } + } else { + // Calculate how much time we should sleep to bring ourselves to the desired rate. + // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) + val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS) + if (sleepTime > 0) Thread.sleep(sleepTime) + waitToWrite(numBytes) + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 6585d494a6..463617a713 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -17,14 +17,17 @@ package org.apache.spark.streaming.util -import java.nio.ByteBuffer -import org.apache.spark.util.{RateLimitedOutputStream, IntParam} +import java.io.IOException import java.net.ServerSocket -import org.apache.spark.{SparkConf, Logging} -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +import java.nio.ByteBuffer + import scala.io.Source -import java.io.IOException + +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream + +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.util.IntParam /** * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala new file mode 100644 index 0000000000..15f13d5b19 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import org.scalatest.FunSuite +import java.io.ByteArrayOutputStream +import java.util.concurrent.TimeUnit._ + +class RateLimitedOutputStreamSuite extends FunSuite { + + private def benchmark[U](f: => U): Long = { + val start = System.nanoTime + f + System.nanoTime - start + } + + test("write") { + val underlying = new ByteArrayOutputStream + val data = "X" * 41000 + val stream = new RateLimitedOutputStream(underlying, 10000) + val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) } + assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4) + assert(underlying.toString("UTF-8") == data) + } +} -- cgit v1.2.3