aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-29 18:31:51 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-29 18:31:51 -0800
commit9e644402c155b5fc68794a17c36ddd19d3242f4f (patch)
tree0fd01d0fb798d9cf2764f1ed666694fabdbb942a /streaming/src
parent0bc0a60d3001dd231e13057a838d4b6550e5a2b9 (diff)
downloadspark-9e644402c155b5fc68794a17c36ddd19d3242f4f.tar.gz
spark-9e644402c155b5fc68794a17c36ddd19d3242f4f.tar.bz2
spark-9e644402c155b5fc68794a17c36ddd19d3242f4f.zip
Improved jekyll and scala docs. Made many classes and method private to remove them from scala docs.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala249
-rw-r--r--streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/Job.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala43
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala2
16 files changed, 201 insertions, 139 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 770f7b0cc0..11a7232d7b 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -8,6 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
+private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
@@ -30,6 +31,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
/**
* Convenience class to speed up the writing of graph checkpoint to file
*/
+private[streaming]
class CheckpointWriter(checkpointDir: String) extends Logging {
val file = new Path(checkpointDir, "graph")
val conf = new Configuration()
@@ -65,7 +67,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
}
-
+private[streaming]
object CheckpointReader extends Logging {
def read(path: String): Checkpoint = {
@@ -103,6 +105,7 @@ object CheckpointReader extends Logging {
}
}
+private[streaming]
class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) {
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index d5048aeed7..3834b57ed3 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
* DStream periodically generates a RDD, either from live data or by transforming the RDD generated
* by a parent DStream.
@@ -38,33 +38,28 @@ import org.apache.hadoop.conf.Configuration
* - A function that is used to generate an RDD after each time interval
*/
-case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-
-abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext)
-extends Serializable with Logging {
+abstract class DStream[T: ClassManifest] (
+ @transient protected[streaming] var ssc: StreamingContext
+ ) extends Serializable with Logging {
initLogging()
- /**
- * ----------------------------------------------
- * Methods that must be implemented by subclasses
- * ----------------------------------------------
- */
+ // =======================================================================
+ // Methods that should be implemented by subclasses of DStream
+ // =======================================================================
- // Time interval at which the DStream generates an RDD
+ /** Time interval after which the DStream generates a RDD */
def slideTime: Time
- // List of parent DStreams on which this DStream depends on
+ /** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
- // Key method that computes RDD for a valid time
+ /** Method that generates a RDD for the given time */
def compute (validTime: Time): Option[RDD[T]]
- /**
- * ---------------------------------------
- * Other general fields and methods of DStream
- * ---------------------------------------
- */
+ // =======================================================================
+ // Methods and fields available on all DStreams
+ // =======================================================================
// RDDs generated, marked as protected[streaming] so that testsuites can access it
@transient
@@ -87,12 +82,15 @@ extends Serializable with Logging {
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
- def isInitialized = (zeroTime != null)
+ protected[streaming] def isInitialized = (zeroTime != null)
// Duration for which the DStream requires its parent DStream to remember each RDD created
- def parentRememberDuration = rememberDuration
+ protected[streaming] def parentRememberDuration = rememberDuration
+
+ /** Returns the StreamingContext associated with this DStream */
+ def context() = ssc
- // Set caching level for the RDDs created by this DStream
+ /** Persists the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
throw new UnsupportedOperationException(
@@ -102,11 +100,16 @@ extends Serializable with Logging {
this
}
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
-
- // Turn on the default caching level for this RDD
+
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): DStream[T] = persist()
+ /**
+ * Enable periodic checkpointing of RDDs of this DStream
+ * @param interval Time interval after which generated RDD will be checkpointed
+ */
def checkpoint(interval: Time): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
@@ -285,7 +288,7 @@ extends Serializable with Logging {
* Generates a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
- * (eg. PerRDDForEachDStream).
+ * (eg. ForEachDStream).
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@@ -420,65 +423,96 @@ extends Serializable with Logging {
generatedRDDs = new HashMap[Time, RDD[T]] ()
}
- /**
- * --------------
- * DStream operations
- * --------------
- */
+ // =======================================================================
+ // DStream operations
+ // =======================================================================
+
+ /** Returns a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, ssc.sc.clean(mapFunc))
}
+ /**
+ * Returns a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
}
+ /** Returns a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+ /**
+ * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+ * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+ * an array.
+ */
def glom(): DStream[Array[T]] = new GlommedDStream(this)
- def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]): DStream[U] = {
- new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc))
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[U: ClassManifest](
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean = false
+ ): DStream[U] = {
+ new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning)
}
- def reduce(reduceFunc: (T, T) => T): DStream[T] = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing each RDD
+ * of this DStream.
+ */
+ def reduce(reduceFunc: (T, T) => T): DStream[T] =
+ this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting each RDD
+ * of this DStream.
+ */
def count(): DStream[Int] = this.map(_ => 1).reduce(_ + _)
-
- def collect(): DStream[Seq[T]] = this.map(x => (null, x)).groupByKey(1).map(_._2)
-
- def foreach(foreachFunc: T => Unit) {
- val newStream = new PerElementForEachDStream(this, ssc.sc.clean(foreachFunc))
- ssc.registerOutputStream(newStream)
- newStream
- }
- def foreachRDD(foreachFunc: RDD[T] => Unit) {
- foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: RDD[T] => Unit) {
+ foreach((r: RDD[T], t: Time) => foreachFunc(r))
}
- def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: (RDD[T], Time) => Unit) {
+ val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
newStream
}
- def transformRDD[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
- transformRDD((r: RDD[T], t: Time) => transformFunc(r))
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ transform((r: RDD[T], t: Time) => transformFunc(r))
}
- def transformRDD[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
new TransformedDStream(this, ssc.sc.clean(transformFunc))
}
- def toBlockingQueue() = {
- val queue = new ArrayBlockingQueue[RDD[T]](10000)
- this.foreachRDD(rdd => {
- queue.add(rdd)
- })
- queue
- }
-
+ /**
+ * Prints the first ten elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
def print() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.take(11)
@@ -489,18 +523,42 @@ extends Serializable with Logging {
if (first11.size > 10) println("...")
println()
}
- val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
}
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @return
+ */
def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime)
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * @param windowTime duration (i.e., width) of the window;
+ * must be a multiple of this DStream's interval
+ * @param slideTime sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's interval
+ */
def window(windowTime: Time, slideTime: Time): DStream[T] = {
new WindowedDStream(this, windowTime, slideTime)
}
+ /**
+ * Returns a new DStream which computed based on tumbling window on this DStream.
+ * This is equivalent to window(batchTime, batchTime).
+ * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ */
def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
+ */
def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = {
this.window(windowTime, slideTime).reduce(reduceFunc)
}
@@ -516,17 +574,31 @@ extends Serializable with Logging {
.map(_._2)
}
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting the number
+ * of elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).count()
+ */
def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = {
this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime)
}
+ /**
+ * Returns a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ */
def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
- def slice(interval: Interval): Seq[RDD[T]] = {
+ /**
+ * Returns all the RDDs defined by the Interval object (both end times included)
+ */
+ protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = {
slice(interval.beginTime, interval.endTime)
}
- // Get all the RDDs between fromTime to toTime (both included)
+ /**
+ * Returns all the RDDs between 'fromTime' to 'toTime' (both included)
+ */
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
var time = toTime.floor(slideTime)
@@ -540,20 +612,26 @@ extends Serializable with Logging {
rdds.toSeq
}
+ /**
+ * Saves each RDD in this DStream as a Sequence file of serialized objects.
+ */
def saveAsObjectFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreach(saveFunc)
}
+ /**
+ * Saves each RDD in this DStream as at text file, using string representation of elements.
+ */
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreach(saveFunc)
}
def register() {
@@ -561,6 +639,8 @@ extends Serializable with Logging {
}
}
+private[streaming]
+case class DStreamCheckpointData(rdds: HashMap[Time, Any])
abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
@@ -583,6 +663,7 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex
* TODO
*/
+private[streaming]
class MappedDStream[T: ClassManifest, U: ClassManifest] (
parent: DStream[T],
mapFunc: T => U
@@ -602,6 +683,7 @@ class MappedDStream[T: ClassManifest, U: ClassManifest] (
* TODO
*/
+private[streaming]
class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
parent: DStream[T],
flatMapFunc: T => Traversable[U]
@@ -621,6 +703,7 @@ class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
* TODO
*/
+private[streaming]
class FilteredDStream[T: ClassManifest](
parent: DStream[T],
filterFunc: T => Boolean
@@ -640,9 +723,11 @@ class FilteredDStream[T: ClassManifest](
* TODO
*/
+private[streaming]
class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
parent: DStream[T],
- mapPartFunc: Iterator[T] => Iterator[U]
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean
) extends DStream[U](parent.ssc) {
override def dependencies = List(parent)
@@ -650,7 +735,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
override def slideTime: Time = parent.slideTime
override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc))
+ parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
}
}
@@ -659,6 +744,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
* TODO
*/
+private[streaming]
class GlommedDStream[T: ClassManifest](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {
@@ -676,6 +762,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T])
* TODO
*/
+private[streaming]
class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
parent: DStream[(K,V)],
createCombiner: V => C,
@@ -702,6 +789,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
* TODO
*/
+private[streaming]
class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
parent: DStream[(K, V)],
mapValueFunc: V => U
@@ -720,7 +808,7 @@ class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
/**
* TODO
*/
-
+private[streaming]
class FlatMapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
parent: DStream[(K, V)],
flatMapValueFunc: V => TraversableOnce[U]
@@ -779,38 +867,8 @@ class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
* TODO
*/
-class PerElementForEachDStream[T: ClassManifest] (
- parent: DStream[T],
- foreachFunc: T => Unit
- ) extends DStream[Unit](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Unit]] = None
-
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => {
- val sparkJobFunc = {
- (iterator: Iterator[T]) => iterator.foreach(foreachFunc)
- }
- ssc.sc.runJob(rdd, sparkJobFunc)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class PerRDDForEachDStream[T: ClassManifest] (
+private[streaming]
+class ForEachDStream[T: ClassManifest] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
@@ -838,6 +896,7 @@ class PerRDDForEachDStream[T: ClassManifest] (
* TODO
*/
+private[streaming]
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
parent: DStream[T],
transformFunc: (RDD[T], Time) => RDD[U]
diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
index 2959ce4540..5ac7e5b08e 100644
--- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
@@ -79,7 +79,7 @@ class SparkFlumeEvent() extends Externalizable {
}
}
-object SparkFlumeEvent {
+private[streaming] object SparkFlumeEvent {
def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index ffb7725ac9..fa0b7ce19d 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -1,5 +1,6 @@
package spark.streaming
+private[streaming]
case class Interval(beginTime: Time, endTime: Time) {
def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs))
diff --git a/streaming/src/main/scala/spark/streaming/Job.scala b/streaming/src/main/scala/spark/streaming/Job.scala
index 0bcb6fd8dc..67bd8388bc 100644
--- a/streaming/src/main/scala/spark/streaming/Job.scala
+++ b/streaming/src/main/scala/spark/streaming/Job.scala
@@ -2,6 +2,7 @@ package spark.streaming
import java.util.concurrent.atomic.AtomicLong
+private[streaming]
class Job(val time: Time, func: () => _) {
val id = Job.getNewId()
def run(): Long = {
@@ -14,6 +15,7 @@ class Job(val time: Time, func: () => _) {
override def toString = "streaming job " + id + " @ " + time
}
+private[streaming]
object Job {
val id = new AtomicLong(0)
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 9bf9251519..fda7264a27 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -5,6 +5,7 @@ import spark.SparkEnv
import java.util.concurrent.Executors
+private[streaming]
class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
class JobHandler(ssc: StreamingContext, job: Job) extends Runnable {
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
index 4e4e9fc942..4bf13dd50c 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
@@ -40,10 +40,10 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
}
-sealed trait NetworkReceiverMessage
-case class StopReceiver(msg: String) extends NetworkReceiverMessage
-case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
-case class ReportError(msg: String) extends NetworkReceiverMessage
+private[streaming] sealed trait NetworkReceiverMessage
+private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
+private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging {
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index b421f795ee..658498dfc1 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -11,10 +11,10 @@ import akka.pattern.ask
import akka.util.duration._
import akka.dispatch._
-trait NetworkInputTrackerMessage
-case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
-case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
-case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
+private[streaming] sealed trait NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
class NetworkInputTracker(
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 720e63bba0..f9fef14196 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -281,7 +281,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreachRDD(saveFunc)
+ self.foreach(saveFunc)
}
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
@@ -303,7 +303,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreachRDD(saveFunc)
+ self.foreach(saveFunc)
}
private def getKeyClass() = implicitly[ClassManifest[K]].erasure
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 014021be61..fd1fa77a24 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -7,11 +7,8 @@ import spark.Logging
import scala.collection.mutable.HashMap
-sealed trait SchedulerMessage
-case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage
-
-class Scheduler(ssc: StreamingContext)
-extends Logging {
+private[streaming]
+class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ce47bcb2da..998fea849f 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -48,7 +48,7 @@ class StreamingContext private (
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
- * Recreates the StreamingContext from a checkpoint file.
+ * Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
@@ -61,7 +61,7 @@ class StreamingContext private (
"both SparkContext and checkpoint as null")
}
- val isCheckpointPresent = (cp_ != null)
+ protected[streaming] val isCheckpointPresent = (cp_ != null)
val sc: SparkContext = {
if (isCheckpointPresent) {
@@ -71,9 +71,9 @@ class StreamingContext private (
}
}
- val env = SparkEnv.get
+ protected[streaming] val env = SparkEnv.get
- val graph: DStreamGraph = {
+ protected[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
@@ -86,10 +86,10 @@ class StreamingContext private (
}
}
- private[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
- private[streaming] var networkInputTracker: NetworkInputTracker = null
+ protected[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
+ protected[streaming] var networkInputTracker: NetworkInputTracker = null
- private[streaming] var checkpointDir: String = {
+ protected[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
cp_.checkpointDir
@@ -98,9 +98,9 @@ class StreamingContext private (
}
}
- private[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
- private[streaming] var receiverJobThread: Thread = null
- private[streaming] var scheduler: Scheduler = null
+ protected[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
+ protected[streaming] var receiverJobThread: Thread = null
+ protected[streaming] var scheduler: Scheduler = null
def remember(duration: Time) {
graph.remember(duration)
@@ -117,11 +117,11 @@ class StreamingContext private (
}
}
- private[streaming] def getInitialCheckpoint(): Checkpoint = {
+ protected[streaming] def getInitialCheckpoint(): Checkpoint = {
if (isCheckpointPresent) cp_ else null
}
- private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+ protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
/**
* Create an input stream that pulls messages form a Kafka Broker.
@@ -188,7 +188,7 @@ class StreamingContext private (
}
/**
- * This function creates a input stream that monitors a Hadoop-compatible filesystem
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and executes the necessary processing on them.
*/
def fileStream[
@@ -206,7 +206,7 @@ class StreamingContext private (
}
/**
- * This function create a input stream from an queue of RDDs. In each batch,
+ * Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue
*/
def queueStream[T: ClassManifest](
@@ -231,22 +231,21 @@ class StreamingContext private (
}
/**
- * This function registers a InputDStream as an input stream that will be
- * started (InputDStream.start() called) to get the input data streams.
+ * Registers an input stream that will be started (InputDStream.start() called) to get the
+ * input data.
*/
def registerInputStream(inputStream: InputDStream[_]) {
graph.addInputStream(inputStream)
}
/**
- * This function registers a DStream as an output stream that will be
- * computed every interval.
+ * Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
}
- def validate() {
+ protected def validate() {
assert(graph != null, "Graph is null")
graph.validate()
@@ -304,7 +303,7 @@ class StreamingContext private (
object StreamingContext {
- def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
+ protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
@@ -318,7 +317,7 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
- def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
+ protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
if (prefix == null) {
time.millis.toString
} else if (suffix == null || suffix.length ==0) {
@@ -328,7 +327,7 @@ object StreamingContext {
}
}
- def getSparkCheckpointDir(sscCheckpointDir: String): String = {
+ protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index 6cb2b4c042..7c4ee3b34c 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -26,7 +26,7 @@ object GrepRaw {
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = new UnionDStream(rawStreams)
- union.filter(_.contains("Alice")).count().foreachRDD(r =>
+ union.filter(_.contains("Alice")).count().foreach(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index fe4c2bf155..182dfd8a52 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -38,7 +38,7 @@ object TopKWordCountRaw {
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
- partialTopKWindowedCounts.foreachRDD(rdd => {
+ partialTopKWindowedCounts.foreach(rdd => {
val collectedCounts = rdd.collect
println("Collected " + collectedCounts.size + " words from partial top words")
println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index a29c81d437..9bcd30f4d7 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -36,7 +36,7 @@ object WordCountRaw {
val union = new UnionDStream(lines.toArray)
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- windowedCounts.foreachRDD(r => println("# unique words = " + r.count()))
+ windowedCounts.foreach(r => println("# unique words = " + r.count()))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index 68be6b7893..a191321d91 100644
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -72,7 +72,7 @@ object PageViewStream {
case "popularUsersSeen" =>
// Look for users in our existing dataset and print it out if we have a match
pageViews.map(view => (view.userID, 1))
- .foreachRDD((rdd, time) => rdd.join(userList)
+ .foreach((rdd, time) => rdd.join(userList)
.map(_._2._2)
.take(10)
.foreach(u => println("Saw user %s at time %s".format(u, time))))
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 8cc2f8ccfc..a44f738957 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -35,7 +35,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
- extends PerRDDForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+ extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
}) {