aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-13 19:58:53 -0800
committerReynold Xin <rxin@apache.org>2014-01-13 19:58:53 -0800
commit33022d6656530ffd272ed447af543473fb8de5e9 (patch)
tree6c762a09056f747c3c9b41277b6648768b315c84
parent30328c347bb9974f551fea2d3e9b19c6a9cd25a9 (diff)
downloadspark-33022d6656530ffd272ed447af543473fb8de5e9.tar.gz
spark-33022d6656530ffd272ed447af543473fb8de5e9.tar.bz2
spark-33022d6656530ffd272ed447af543473fb8de5e9.zip
Adjusted visibility of various components.
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/InterruptibleIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/package.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/CompletionIterator.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala8
-rw-r--r--project/SparkBuild.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala (renamed from core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala)2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala)2
21 files changed, 66 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index e89ac28b8e..2ba871a600 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -24,7 +24,7 @@ import scala.collection.generic.Growable
import org.apache.spark.serializer.JavaSerializer
/**
- * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,
+ * A datatype that can be accumulated, ie has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
*
* You must define how to add data, and how to merge two of these together. For some datatypes,
@@ -185,7 +185,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
}
/**
- * A simpler value of [[org.apache.spark.Accumulable]] where the result type being accumulated is the same
+ * A simpler value of [[Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged.
*
* @param initialValue initial value of accumulator
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index c6b4ac5192..d7d10285da 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -27,8 +27,8 @@ import org.apache.spark.rdd.RDD
/**
- * A future for the result of an action. This is an extension of the Scala Future interface to
- * support cancellation.
+ * A future for the result of an action to support cancellation. This is an extension of the
+ * Scala Future interface to support cancellation.
*/
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
@@ -86,7 +86,7 @@ trait FutureAction[T] extends Future[T] {
/**
- * The future holding the result of an action that triggers a single job. Examples include
+ * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
@@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
/**
- * A FutureAction for actions that could trigger multiple Spark jobs. Examples include take,
+ * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
index 56e0b8d2c0..9b1601d5b9 100644
--- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
+++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
@@ -19,7 +19,7 @@ package org.apache.spark
/**
* An iterator that wraps around an existing iterator to provide task killing functionality.
- * It works by checking the interrupted flag in TaskContext.
+ * It works by checking the interrupted flag in [[TaskContext]].
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 9063cae87e..b749e5414d 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -122,7 +122,7 @@ trait Logging {
}
}
-object Logging {
+private object Logging {
@volatile private var initialized = false
val initLock = new Object()
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 0fc478a419..6bfe2cb4a2 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.spark._
+private[spark]
abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
def value: T
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
index fb161ce69d..940e5ab805 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkConf
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
-private[spark] trait BroadcastFactory {
+trait BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf): Unit
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 0eacda3d7d..39ee0dbb92 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -63,7 +63,10 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
}
}
-private[spark] class HttpBroadcastFactory extends BroadcastFactory {
+/**
+ * A [[BroadcastFactory]] implementation that uses a HTTP server as the broadcast medium.
+ */
+class HttpBroadcastFactory extends BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 1d295c62bc..d351dfc1f5 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -236,8 +236,10 @@ private[spark] case class TorrentInfo(
@transient var hasBlocks = 0
}
-private[spark] class TorrentBroadcastFactory
- extends BroadcastFactory {
+/**
+ * A [[BroadcastFactory]] that creates a torrent-based implementation of broadcast.
+ */
+class TorrentBroadcastFactory extends BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index e133893f6c..9987e2300c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -29,13 +29,12 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{AkkaUtils, Utils}
-import akka.actor.Actor.emptyBehavior
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
/**
* Proxy that relays messages to the driver.
*/
-class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
+private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 7507bf8ad0..cf6a23339d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -10,8 +10,9 @@ import org.apache.spark.util.Utils
/**
** Utilities for running commands with the spark classpath.
*/
+private[spark]
object CommandUtils extends Logging {
- private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+ def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java")
// SPARK-698: do not call the run.cmd script, as process.destroy()
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 70a5a8caff..2625a7f6a5 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -29,6 +29,9 @@ package org.apache
* be saved as SequenceFiles. These operations are automatically available on any RDD of the right
* type (e.g. RDD[(Int, Int)] through implicit conversions when you
* `import org.apache.spark.SparkContext._`.
+ *
+ * Java programmers should reference the [[spark.api.java]] package
+ * for Spark programming APIs in Java.
*/
package object spark {
// For package docs only
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index d4f396afb5..8ef919c4b5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -27,7 +27,6 @@ import scala.io.Source
import scala.reflect.ClassTag
import org.apache.spark.{SparkEnv, Partition, TaskContext}
-import org.apache.spark.broadcast.Broadcast
/**
@@ -113,7 +112,7 @@ class PipedRDD[T: ClassTag](
}
}
-object PipedRDD {
+private object PipedRDD {
// Split a string into words using a standard StringTokenizer
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 55a40a92c9..d8e97c3b7c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
import java.util.Properties
import org.apache.spark.util.{Utils, Distribution}
-import org.apache.spark.{Logging, SparkContext, TaskEndReason}
+import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.executor.TaskMetrics
sealed trait SparkListenerEvents
@@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
extends SparkListenerEvents
-case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
+case class SparkListenerStageCompleted(stage: StageInfo) extends SparkListenerEvents
case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
@@ -46,6 +46,9 @@ case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents
+/**
+ * Interface for listening to events from the Spark scheduler.
+ */
trait SparkListener {
/**
* Called when a stage is completed, with information on the completed stage
@@ -115,7 +118,7 @@ class StatsReportListener extends SparkListener with Logging {
}
-object StatsReportListener extends Logging {
+private[spark] object StatsReportListener extends Logging {
//for profiling, the extremes are more interesting
val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
@@ -202,9 +205,9 @@ object StatsReportListener extends Logging {
}
}
+private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
-case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
-object RuntimePercentage {
+private object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble
val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 369a277232..48cec4be41 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
*
* This interface does not support concurrent writes.
*/
-abstract class BlockObjectWriter(val blockId: BlockId) {
+private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
def open(): BlockObjectWriter
@@ -69,7 +69,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
}
/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
-class DiskBlockObjectWriter(
+private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
file: File,
serializer: Serializer,
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 0f84810d6b..1b7934d59f 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -108,6 +108,10 @@ class StorageLevel private(
}
+/**
+ * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
+ * new storage levels.
+ */
object StorageLevel {
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
index dc15a38b29..fcc1ca9502 100644
--- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
@@ -18,14 +18,15 @@
package org.apache.spark.util
/**
- * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements
+ * Wrapper around an iterator which calls a completion method after it successfully iterates
+ * through all the elements.
*/
-abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
- def next = sub.next
+private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
+ def next() = sub.next()
def hasNext = {
val r = sub.hasNext
if (!r) {
- completion
+ completion()
}
r
}
@@ -33,7 +34,7 @@ abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterato
def completion()
}
-object CompletionIterator {
+private[spark] object CompletionIterator {
def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = {
new CompletionIterator[A,I](sub) {
def completion() = completionFunction
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index ac07a55cb9..b0febe906a 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -18,13 +18,13 @@
package org.apache.spark.util
import java.util.{TimerTask, Timer}
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{SparkConf, Logging}
/**
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
-class MetadataCleaner(
+private[spark] class MetadataCleaner(
cleanerType: MetadataCleanerType.MetadataCleanerType,
cleanupFunc: (Long) => Unit,
conf: SparkConf)
@@ -60,7 +60,7 @@ class MetadataCleaner(
}
}
-object MetadataCleanerType extends Enumeration {
+private[spark] object MetadataCleanerType extends Enumeration {
val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
@@ -72,7 +72,7 @@ object MetadataCleanerType extends Enumeration {
// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
-object MetadataCleaner {
+private[spark] object MetadataCleaner {
def getDelaySeconds(conf: SparkConf) = {
conf.getInt("spark.cleaner.ttl", -1)
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c8b5f09ab5..d4e06dd2a1 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -136,6 +136,13 @@ object SparkBuild extends Build {
javaOptions += "-Xmx3g",
// Show full stack trace and duration in test cases.
testOptions in Test += Tests.Argument("-oDF"),
+ // Remove certain packages from Scaladoc
+ scalacOptions in (Compile,doc) := Seq("-skip-packages", Seq(
+ "akka",
+ "org.apache.spark.network",
+ "org.apache.spark.deploy",
+ "org.apache.spark.util.collection"
+ ).mkString(":")),
// Only allow one test at a time, even across projects, since they run in the same JVM
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
diff --git a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
index 47e1b45004..b9c0596378 100644
--- a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.streaming.util
import scala.annotation.tailrec
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 6585d494a6..463617a713 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -17,14 +17,17 @@
package org.apache.spark.streaming.util
-import java.nio.ByteBuffer
-import org.apache.spark.util.{RateLimitedOutputStream, IntParam}
+import java.io.IOException
import java.net.ServerSocket
-import org.apache.spark.{SparkConf, Logging}
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+import java.nio.ByteBuffer
+
import scala.io.Source
-import java.io.IOException
+
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.util.IntParam
/**
* A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
diff --git a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
index a9dd0b1a5b..15f13d5b19 100644
--- a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.streaming.util
import org.scalatest.FunSuite
import java.io.ByteArrayOutputStream