From 9e644402c155b5fc68794a17c36ddd19d3242f4f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 29 Dec 2012 18:31:51 -0800 Subject: Improved jekyll and scala docs. Made many classes and method private to remove them from scala docs. --- .../main/scala/spark/streaming/Checkpoint.scala | 5 +- .../src/main/scala/spark/streaming/DStream.scala | 249 +++++++++++++-------- .../scala/spark/streaming/FlumeInputDStream.scala | 2 +- .../src/main/scala/spark/streaming/Interval.scala | 1 + streaming/src/main/scala/spark/streaming/Job.scala | 2 + .../main/scala/spark/streaming/JobManager.scala | 1 + .../spark/streaming/NetworkInputDStream.scala | 8 +- .../spark/streaming/NetworkInputTracker.scala | 8 +- .../spark/streaming/PairDStreamFunctions.scala | 4 +- .../src/main/scala/spark/streaming/Scheduler.scala | 7 +- .../scala/spark/streaming/StreamingContext.scala | 43 ++-- .../scala/spark/streaming/examples/GrepRaw.scala | 2 +- .../streaming/examples/TopKWordCountRaw.scala | 2 +- .../spark/streaming/examples/WordCountRaw.scala | 2 +- .../examples/clickstream/PageViewStream.scala | 2 +- .../test/scala/spark/streaming/TestSuiteBase.scala | 2 +- 16 files changed, 201 insertions(+), 139 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 770f7b0cc0..11a7232d7b 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -8,6 +8,7 @@ import org.apache.hadoop.conf.Configuration import java.io._ +private[streaming] class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { val master = ssc.sc.master @@ -30,6 +31,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) /** * Convenience class to speed up the writing of graph checkpoint to file */ +private[streaming] class CheckpointWriter(checkpointDir: String) extends Logging { val file = new Path(checkpointDir, "graph") val conf = new Configuration() @@ -65,7 +67,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } - +private[streaming] object CheckpointReader extends Logging { def read(path: String): Checkpoint = { @@ -103,6 +105,7 @@ object CheckpointReader extends Logging { } } +private[streaming] class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) { override def resolveClass(desc: ObjectStreamClass): Class[_] = { try { diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index d5048aeed7..3834b57ed3 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]] * for more details on RDDs). DStreams can either be created from live data (such as, data from - * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each * DStream periodically generates a RDD, either from live data or by transforming the RDD generated * by a parent DStream. @@ -38,33 +38,28 @@ import org.apache.hadoop.conf.Configuration * - A function that is used to generate an RDD after each time interval */ -case class DStreamCheckpointData(rdds: HashMap[Time, Any]) - -abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext) -extends Serializable with Logging { +abstract class DStream[T: ClassManifest] ( + @transient protected[streaming] var ssc: StreamingContext + ) extends Serializable with Logging { initLogging() - /** - * ---------------------------------------------- - * Methods that must be implemented by subclasses - * ---------------------------------------------- - */ + // ======================================================================= + // Methods that should be implemented by subclasses of DStream + // ======================================================================= - // Time interval at which the DStream generates an RDD + /** Time interval after which the DStream generates a RDD */ def slideTime: Time - // List of parent DStreams on which this DStream depends on + /** List of parent DStreams on which this DStream depends on */ def dependencies: List[DStream[_]] - // Key method that computes RDD for a valid time + /** Method that generates a RDD for the given time */ def compute (validTime: Time): Option[RDD[T]] - /** - * --------------------------------------- - * Other general fields and methods of DStream - * --------------------------------------- - */ + // ======================================================================= + // Methods and fields available on all DStreams + // ======================================================================= // RDDs generated, marked as protected[streaming] so that testsuites can access it @transient @@ -87,12 +82,15 @@ extends Serializable with Logging { // Reference to whole DStream graph protected[streaming] var graph: DStreamGraph = null - def isInitialized = (zeroTime != null) + protected[streaming] def isInitialized = (zeroTime != null) // Duration for which the DStream requires its parent DStream to remember each RDD created - def parentRememberDuration = rememberDuration + protected[streaming] def parentRememberDuration = rememberDuration + + /** Returns the StreamingContext associated with this DStream */ + def context() = ssc - // Set caching level for the RDDs created by this DStream + /** Persists the RDDs of this DStream with the given storage level */ def persist(level: StorageLevel): DStream[T] = { if (this.isInitialized) { throw new UnsupportedOperationException( @@ -102,11 +100,16 @@ extends Serializable with Logging { this } + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) - - // Turn on the default caching level for this RDD + + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): DStream[T] = persist() + /** + * Enable periodic checkpointing of RDDs of this DStream + * @param interval Time interval after which generated RDD will be checkpointed + */ def checkpoint(interval: Time): DStream[T] = { if (isInitialized) { throw new UnsupportedOperationException( @@ -285,7 +288,7 @@ extends Serializable with Logging { * Generates a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this - * (eg. PerRDDForEachDStream). + * (eg. ForEachDStream). */ protected[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { @@ -420,65 +423,96 @@ extends Serializable with Logging { generatedRDDs = new HashMap[Time, RDD[T]] () } - /** - * -------------- - * DStream operations - * -------------- - */ + // ======================================================================= + // DStream operations + // ======================================================================= + + /** Returns a new DStream by applying a function to all elements of this DStream. */ def map[U: ClassManifest](mapFunc: T => U): DStream[U] = { new MappedDStream(this, ssc.sc.clean(mapFunc)) } + /** + * Returns a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = { new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc)) } + /** Returns a new DStream containing only the elements that satisfy a predicate. */ def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) + /** + * Return a new DStream in which each RDD is generated by applying glom() to each RDD of + * this DStream. Applying glom() to an RDD coalesces all elements within each partition into + * an array. + */ def glom(): DStream[Array[T]] = new GlommedDStream(this) - def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]): DStream[U] = { - new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc)) + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[U: ClassManifest]( + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean = false + ): DStream[U] = { + new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning) } - def reduce(reduceFunc: (T, T) => T): DStream[T] = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + /** + * Returns a new DStream in which each RDD has a single element generated by reducing each RDD + * of this DStream. + */ + def reduce(reduceFunc: (T, T) => T): DStream[T] = + this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + /** + * Returns a new DStream in which each RDD has a single element generated by counting each RDD + * of this DStream. + */ def count(): DStream[Int] = this.map(_ => 1).reduce(_ + _) - - def collect(): DStream[Seq[T]] = this.map(x => (null, x)).groupByKey(1).map(_._2) - - def foreach(foreachFunc: T => Unit) { - val newStream = new PerElementForEachDStream(this, ssc.sc.clean(foreachFunc)) - ssc.registerOutputStream(newStream) - newStream - } - def foreachRDD(foreachFunc: RDD[T] => Unit) { - foreachRDD((r: RDD[T], t: Time) => foreachFunc(r)) + /** + * Applies a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: RDD[T] => Unit) { + foreach((r: RDD[T], t: Time) => foreachFunc(r)) } - def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { - val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc)) + /** + * Applies a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: (RDD[T], Time) => Unit) { + val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc)) ssc.registerOutputStream(newStream) newStream } - def transformRDD[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transformRDD((r: RDD[T], t: Time) => transformFunc(r)) + /** + * Returns a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + transform((r: RDD[T], t: Time) => transformFunc(r)) } - def transformRDD[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + /** + * Returns a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { new TransformedDStream(this, ssc.sc.clean(transformFunc)) } - def toBlockingQueue() = { - val queue = new ArrayBlockingQueue[RDD[T]](10000) - this.foreachRDD(rdd => { - queue.add(rdd) - }) - queue - } - + /** + * Prints the first ten elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ def print() { def foreachFunc = (rdd: RDD[T], time: Time) => { val first11 = rdd.take(11) @@ -489,18 +523,42 @@ extends Serializable with Logging { if (first11.size > 10) println("...") println() } - val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc)) + val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc)) ssc.registerOutputStream(newStream) } + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * The new DStream generates RDDs with the same interval as this DStream. + * @param windowTime width of the window; must be a multiple of this DStream's interval. + * @return + */ def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime) + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * @param windowTime duration (i.e., width) of the window; + * must be a multiple of this DStream's interval + * @param slideTime sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's interval + */ def window(windowTime: Time, slideTime: Time): DStream[T] = { new WindowedDStream(this, windowTime, slideTime) } + /** + * Returns a new DStream which computed based on tumbling window on this DStream. + * This is equivalent to window(batchTime, batchTime). + * @param batchTime tumbling window duration; must be a multiple of this DStream's interval + */ def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime) + /** + * Returns a new DStream in which each RDD has a single element generated by reducing all + * elements in a window over this DStream. windowTime and slideTime are as defined in the + * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc) + */ def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = { this.window(windowTime, slideTime).reduce(reduceFunc) } @@ -516,17 +574,31 @@ extends Serializable with Logging { .map(_._2) } + /** + * Returns a new DStream in which each RDD has a single element generated by counting the number + * of elements in a window over this DStream. windowTime and slideTime are as defined in the + * window() operation. This is equivalent to window(windowTime, slideTime).count() + */ def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = { this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime) } + /** + * Returns a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same interval (i.e., slideTime) as this DStream. + */ def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) - def slice(interval: Interval): Seq[RDD[T]] = { + /** + * Returns all the RDDs defined by the Interval object (both end times included) + */ + protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = { slice(interval.beginTime, interval.endTime) } - // Get all the RDDs between fromTime to toTime (both included) + /** + * Returns all the RDDs between 'fromTime' to 'toTime' (both included) + */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { val rdds = new ArrayBuffer[RDD[T]]() var time = toTime.floor(slideTime) @@ -540,20 +612,26 @@ extends Serializable with Logging { rdds.toSeq } + /** + * Saves each RDD in this DStream as a Sequence file of serialized objects. + */ def saveAsObjectFiles(prefix: String, suffix: String = "") { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsObjectFile(file) } - this.foreachRDD(saveFunc) + this.foreach(saveFunc) } + /** + * Saves each RDD in this DStream as at text file, using string representation of elements. + */ def saveAsTextFiles(prefix: String, suffix: String = "") { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } - this.foreachRDD(saveFunc) + this.foreach(saveFunc) } def register() { @@ -561,6 +639,8 @@ extends Serializable with Logging { } } +private[streaming] +case class DStreamCheckpointData(rdds: HashMap[Time, Any]) abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { @@ -583,6 +663,7 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex * TODO */ +private[streaming] class MappedDStream[T: ClassManifest, U: ClassManifest] ( parent: DStream[T], mapFunc: T => U @@ -602,6 +683,7 @@ class MappedDStream[T: ClassManifest, U: ClassManifest] ( * TODO */ +private[streaming] class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( parent: DStream[T], flatMapFunc: T => Traversable[U] @@ -621,6 +703,7 @@ class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( * TODO */ +private[streaming] class FilteredDStream[T: ClassManifest]( parent: DStream[T], filterFunc: T => Boolean @@ -640,9 +723,11 @@ class FilteredDStream[T: ClassManifest]( * TODO */ +private[streaming] class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( parent: DStream[T], - mapPartFunc: Iterator[T] => Iterator[U] + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean ) extends DStream[U](parent.ssc) { override def dependencies = List(parent) @@ -650,7 +735,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( override def slideTime: Time = parent.slideTime override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc)) + parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) } } @@ -659,6 +744,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( * TODO */ +private[streaming] class GlommedDStream[T: ClassManifest](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { @@ -676,6 +762,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T]) * TODO */ +private[streaming] class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( parent: DStream[(K,V)], createCombiner: V => C, @@ -702,6 +789,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( * TODO */ +private[streaming] class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( parent: DStream[(K, V)], mapValueFunc: V => U @@ -720,7 +808,7 @@ class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( /** * TODO */ - +private[streaming] class FlatMapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( parent: DStream[(K, V)], flatMapValueFunc: V => TraversableOnce[U] @@ -779,38 +867,8 @@ class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) * TODO */ -class PerElementForEachDStream[T: ClassManifest] ( - parent: DStream[T], - foreachFunc: T => Unit - ) extends DStream[Unit](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[Unit]] = None - - override def generateJob(time: Time): Option[Job] = { - parent.getOrCompute(time) match { - case Some(rdd) => - val jobFunc = () => { - val sparkJobFunc = { - (iterator: Iterator[T]) => iterator.foreach(foreachFunc) - } - ssc.sc.runJob(rdd, sparkJobFunc) - } - Some(new Job(time, jobFunc)) - case None => None - } - } -} - - -/** - * TODO - */ - -class PerRDDForEachDStream[T: ClassManifest] ( +private[streaming] +class ForEachDStream[T: ClassManifest] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { @@ -838,6 +896,7 @@ class PerRDDForEachDStream[T: ClassManifest] ( * TODO */ +private[streaming] class TransformedDStream[T: ClassManifest, U: ClassManifest] ( parent: DStream[T], transformFunc: (RDD[T], Time) => RDD[U] diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala index 2959ce4540..5ac7e5b08e 100644 --- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala @@ -79,7 +79,7 @@ class SparkFlumeEvent() extends Externalizable { } } -object SparkFlumeEvent { +private[streaming] object SparkFlumeEvent { def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { val event = new SparkFlumeEvent event.event = in diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala index ffb7725ac9..fa0b7ce19d 100644 --- a/streaming/src/main/scala/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/spark/streaming/Interval.scala @@ -1,5 +1,6 @@ package spark.streaming +private[streaming] case class Interval(beginTime: Time, endTime: Time) { def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs)) diff --git a/streaming/src/main/scala/spark/streaming/Job.scala b/streaming/src/main/scala/spark/streaming/Job.scala index 0bcb6fd8dc..67bd8388bc 100644 --- a/streaming/src/main/scala/spark/streaming/Job.scala +++ b/streaming/src/main/scala/spark/streaming/Job.scala @@ -2,6 +2,7 @@ package spark.streaming import java.util.concurrent.atomic.AtomicLong +private[streaming] class Job(val time: Time, func: () => _) { val id = Job.getNewId() def run(): Long = { @@ -14,6 +15,7 @@ class Job(val time: Time, func: () => _) { override def toString = "streaming job " + id + " @ " + time } +private[streaming] object Job { val id = new AtomicLong(0) diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 9bf9251519..fda7264a27 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -5,6 +5,7 @@ import spark.SparkEnv import java.util.concurrent.Executors +private[streaming] class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { class JobHandler(ssc: StreamingContext, job: Job) extends Runnable { diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala index 4e4e9fc942..4bf13dd50c 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala @@ -40,10 +40,10 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming } -sealed trait NetworkReceiverMessage -case class StopReceiver(msg: String) extends NetworkReceiverMessage -case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage -case class ReportError(msg: String) extends NetworkReceiverMessage +private[streaming] sealed trait NetworkReceiverMessage +private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging { diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index b421f795ee..658498dfc1 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -11,10 +11,10 @@ import akka.pattern.ask import akka.util.duration._ import akka.dispatch._ -trait NetworkInputTrackerMessage -case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage -case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage -case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage +private[streaming] sealed trait NetworkInputTrackerMessage +private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage +private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage class NetworkInputTracker( diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 720e63bba0..f9fef14196 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -281,7 +281,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreachRDD(saveFunc) + self.foreach(saveFunc) } def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( @@ -303,7 +303,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreachRDD(saveFunc) + self.foreach(saveFunc) } private def getKeyClass() = implicitly[ClassManifest[K]].erasure diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 014021be61..fd1fa77a24 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -7,11 +7,8 @@ import spark.Logging import scala.collection.mutable.HashMap -sealed trait SchedulerMessage -case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage - -class Scheduler(ssc: StreamingContext) -extends Logging { +private[streaming] +class Scheduler(ssc: StreamingContext) extends Logging { initLogging() diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index ce47bcb2da..998fea849f 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -48,7 +48,7 @@ class StreamingContext private ( this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) /** - * Recreates the StreamingContext from a checkpoint file. + * Re-creates a StreamingContext from a checkpoint file. * @param path Path either to the directory that was specified as the checkpoint directory, or * to the checkpoint file 'graph' or 'graph.bk'. */ @@ -61,7 +61,7 @@ class StreamingContext private ( "both SparkContext and checkpoint as null") } - val isCheckpointPresent = (cp_ != null) + protected[streaming] val isCheckpointPresent = (cp_ != null) val sc: SparkContext = { if (isCheckpointPresent) { @@ -71,9 +71,9 @@ class StreamingContext private ( } } - val env = SparkEnv.get + protected[streaming] val env = SparkEnv.get - val graph: DStreamGraph = { + protected[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() @@ -86,10 +86,10 @@ class StreamingContext private ( } } - private[streaming] val nextNetworkInputStreamId = new AtomicInteger(0) - private[streaming] var networkInputTracker: NetworkInputTracker = null + protected[streaming] val nextNetworkInputStreamId = new AtomicInteger(0) + protected[streaming] var networkInputTracker: NetworkInputTracker = null - private[streaming] var checkpointDir: String = { + protected[streaming] var checkpointDir: String = { if (isCheckpointPresent) { sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true) cp_.checkpointDir @@ -98,9 +98,9 @@ class StreamingContext private ( } } - private[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null - private[streaming] var receiverJobThread: Thread = null - private[streaming] var scheduler: Scheduler = null + protected[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null + protected[streaming] var receiverJobThread: Thread = null + protected[streaming] var scheduler: Scheduler = null def remember(duration: Time) { graph.remember(duration) @@ -117,11 +117,11 @@ class StreamingContext private ( } } - private[streaming] def getInitialCheckpoint(): Checkpoint = { + protected[streaming] def getInitialCheckpoint(): Checkpoint = { if (isCheckpointPresent) cp_ else null } - private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() + protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() /** * Create an input stream that pulls messages form a Kafka Broker. @@ -188,7 +188,7 @@ class StreamingContext private ( } /** - * This function creates a input stream that monitors a Hadoop-compatible filesystem + * Creates a input stream that monitors a Hadoop-compatible filesystem * for new files and executes the necessary processing on them. */ def fileStream[ @@ -206,7 +206,7 @@ class StreamingContext private ( } /** - * This function create a input stream from an queue of RDDs. In each batch, + * Creates a input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue */ def queueStream[T: ClassManifest]( @@ -231,22 +231,21 @@ class StreamingContext private ( } /** - * This function registers a InputDStream as an input stream that will be - * started (InputDStream.start() called) to get the input data streams. + * Registers an input stream that will be started (InputDStream.start() called) to get the + * input data. */ def registerInputStream(inputStream: InputDStream[_]) { graph.addInputStream(inputStream) } /** - * This function registers a DStream as an output stream that will be - * computed every interval. + * Registers an output stream that will be computed every interval */ def registerOutputStream(outputStream: DStream[_]) { graph.addOutputStream(outputStream) } - def validate() { + protected def validate() { assert(graph != null, "Graph is null") graph.validate() @@ -304,7 +303,7 @@ class StreamingContext private ( object StreamingContext { - def createNewSparkContext(master: String, frameworkName: String): SparkContext = { + protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = { // Set the default cleaner delay to an hour if not already set. // This should be sufficient for even 1 second interval. @@ -318,7 +317,7 @@ object StreamingContext { new PairDStreamFunctions[K, V](stream) } - def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { + protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { if (prefix == null) { time.millis.toString } else if (suffix == null || suffix.length ==0) { @@ -328,7 +327,7 @@ object StreamingContext { } } - def getSparkCheckpointDir(sscCheckpointDir: String): String = { + protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = { new Path(sscCheckpointDir, UUID.randomUUID.toString).toString } } diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index 6cb2b4c042..7c4ee3b34c 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -26,7 +26,7 @@ object GrepRaw { val rawStreams = (1 to numStreams).map(_ => ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray val union = new UnionDStream(rawStreams) - union.filter(_.contains("Alice")).count().foreachRDD(r => + union.filter(_.contains("Alice")).count().foreach(r => println("Grep count: " + r.collect().mkString)) ssc.start() } diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index fe4c2bf155..182dfd8a52 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -38,7 +38,7 @@ object TopKWordCountRaw { val counts = union.mapPartitions(splitAndCountPartitions) val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) - partialTopKWindowedCounts.foreachRDD(rdd => { + partialTopKWindowedCounts.foreach(rdd => { val collectedCounts = rdd.collect println("Collected " + collectedCounts.size + " words from partial top words") println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(",")) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index a29c81d437..9bcd30f4d7 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -36,7 +36,7 @@ object WordCountRaw { val union = new UnionDStream(lines.toArray) val counts = union.mapPartitions(splitAndCountPartitions) val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) - windowedCounts.foreachRDD(r => println("# unique words = " + r.count())) + windowedCounts.foreach(r => println("# unique words = " + r.count())) ssc.start() } diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index 68be6b7893..a191321d91 100644 --- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -72,7 +72,7 @@ object PageViewStream { case "popularUsersSeen" => // Look for users in our existing dataset and print it out if we have a match pageViews.map(view => (view.userID, 1)) - .foreachRDD((rdd, time) => rdd.join(userList) + .foreach((rdd, time) => rdd.join(userList) .map(_._2._2) .take(10) .foreach(u => println("Saw user %s at time %s".format(u, time)))) diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index 8cc2f8ccfc..a44f738957 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -35,7 +35,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. */ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) - extends PerRDDForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { + extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() output += collected }) { -- cgit v1.2.3 From 7e0271b4387eaf27cd96f3057ce2465b1271a480 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 30 Dec 2012 15:19:55 -0800 Subject: Refactored a whole lot to push all DStreams into the spark.streaming.dstream package. --- core/src/main/scala/spark/rdd/ShuffledRDD.scala | 1 + .../scala/spark/streaming/CoGroupedDStream.scala | 38 --- .../spark/streaming/ConstantInputDStream.scala | 18 -- .../src/main/scala/spark/streaming/DStream.scala | 276 +-------------------- .../main/scala/spark/streaming/DStreamGraph.scala | 1 + .../main/scala/spark/streaming/DataHandler.scala | 83 ------- .../scala/spark/streaming/FileInputDStream.scala | 109 -------- .../scala/spark/streaming/FlumeInputDStream.scala | 130 ---------- .../spark/streaming/NetworkInputDStream.scala | 156 ------------ .../spark/streaming/NetworkInputTracker.scala | 2 + .../spark/streaming/PairDStreamFunctions.scala | 7 +- .../scala/spark/streaming/QueueInputDStream.scala | 40 --- .../scala/spark/streaming/RawInputDStream.scala | 85 ------- .../spark/streaming/ReducedWindowedDStream.scala | 149 ----------- .../src/main/scala/spark/streaming/Scheduler.scala | 3 - .../scala/spark/streaming/SocketInputDStream.scala | 107 -------- .../main/scala/spark/streaming/StateDStream.scala | 84 ------- .../scala/spark/streaming/StreamingContext.scala | 13 +- .../src/main/scala/spark/streaming/Time.scala | 11 +- .../scala/spark/streaming/WindowedDStream.scala | 39 --- .../spark/streaming/dstream/CoGroupedDStream.scala | 39 +++ .../streaming/dstream/ConstantInputDStream.scala | 19 ++ .../spark/streaming/dstream/DataHandler.scala | 83 +++++++ .../spark/streaming/dstream/FileInputDStream.scala | 110 ++++++++ .../spark/streaming/dstream/FilteredDStream.scala | 21 ++ .../streaming/dstream/FlatMapValuedDStream.scala | 20 ++ .../streaming/dstream/FlatMappedDStream.scala | 20 ++ .../streaming/dstream/FlumeInputDStream.scala | 135 ++++++++++ .../spark/streaming/dstream/ForEachDStream.scala | 28 +++ .../spark/streaming/dstream/GlommedDStream.scala | 17 ++ .../spark/streaming/dstream/InputDStream.scala | 19 ++ .../streaming/dstream/KafkaInputDStream.scala | 197 +++++++++++++++ .../streaming/dstream/MapPartitionedDStream.scala | 21 ++ .../spark/streaming/dstream/MapValuedDStream.scala | 21 ++ .../spark/streaming/dstream/MappedDStream.scala | 20 ++ .../streaming/dstream/NetworkInputDStream.scala | 157 ++++++++++++ .../streaming/dstream/QueueInputDStream.scala | 41 +++ .../spark/streaming/dstream/RawInputDStream.scala | 88 +++++++ .../streaming/dstream/ReducedWindowedDStream.scala | 148 +++++++++++ .../spark/streaming/dstream/ShuffledDStream.scala | 27 ++ .../streaming/dstream/SocketInputDStream.scala | 103 ++++++++ .../spark/streaming/dstream/StateDStream.scala | 83 +++++++ .../streaming/dstream/TransformedDStream.scala | 19 ++ .../spark/streaming/dstream/UnionDStream.scala | 39 +++ .../spark/streaming/dstream/WindowedDStream.scala | 40 +++ .../scala/spark/streaming/examples/GrepRaw.scala | 2 +- .../streaming/examples/TopKWordCountRaw.scala | 2 +- .../spark/streaming/examples/WordCountRaw.scala | 2 +- .../spark/streaming/input/KafkaInputDStream.scala | 193 -------------- .../scala/spark/streaming/CheckpointSuite.scala | 2 +- .../test/scala/spark/streaming/FailureSuite.scala | 2 +- .../scala/spark/streaming/InputStreamsSuite.scala | 1 + .../test/scala/spark/streaming/TestSuiteBase.scala | 48 +++- .../spark/streaming/WindowOperationsSuite.scala | 12 +- 54 files changed, 1600 insertions(+), 1531 deletions(-) delete mode 100644 streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/DataHandler.scala delete mode 100644 streaming/src/main/scala/spark/streaming/FileInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/QueueInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/RawInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/SocketInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/StateDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/WindowedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala (limited to 'streaming') diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index f40b56be64..1b219473e0 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -1,6 +1,7 @@ package spark.rdd import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Split, TaskContext} +import spark.SparkContext._ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { override val index = idx diff --git a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala deleted file mode 100644 index 61d088eddb..0000000000 --- a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala +++ /dev/null @@ -1,38 +0,0 @@ -package spark.streaming - -import spark.{RDD, Partitioner} -import spark.rdd.CoGroupedRDD - -class CoGroupedDStream[K : ClassManifest]( - parents: Seq[DStream[(_, _)]], - partitioner: Partitioner - ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideTime).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideTime = parents.head.slideTime - - override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { - val part = partitioner - val rdds = parents.flatMap(_.getOrCompute(validTime)) - if (rdds.size > 0) { - val q = new CoGroupedRDD[K](rdds, part) - Some(q) - } else { - None - } - } - -} diff --git a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala b/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala deleted file mode 100644 index 80150708fd..0000000000 --- a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala +++ /dev/null @@ -1,18 +0,0 @@ -package spark.streaming - -import spark.RDD - -/** - * An input stream that always returns the same RDD on each timestep. Useful for testing. - */ -class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T]) - extends InputDStream[T](ssc_) { - - override def start() {} - - override def stop() {} - - override def compute(validTime: Time): Option[RDD[T]] = { - Some(rdd) - } -} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 3834b57ed3..292ad3b9f9 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -1,17 +1,15 @@ package spark.streaming +import spark.streaming.dstream._ import StreamingContext._ import Time._ -import spark._ -import spark.SparkContext._ -import spark.rdd._ +import spark.{RDD, Logging} import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import java.util.concurrent.ArrayBlockingQueue import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.hadoop.fs.Path @@ -197,7 +195,7 @@ abstract class DStream[T: ClassManifest] ( "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " + "the Java property 'spark.cleaner.delay' to more than " + - math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes." + math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes." ) dependencies.foreach(_.validate()) @@ -642,271 +640,3 @@ abstract class DStream[T: ClassManifest] ( private[streaming] case class DStreamCheckpointData(rdds: HashMap[Time, Any]) -abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) - extends DStream[T](ssc_) { - - override def dependencies = List() - - override def slideTime = { - if (ssc == null) throw new Exception("ssc is null") - if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") - ssc.graph.batchDuration - } - - def start() - - def stop() -} - - -/** - * TODO - */ - -private[streaming] -class MappedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], - mapFunc: T => U - ) extends DStream[U](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.map[U](mapFunc)) - } -} - - -/** - * TODO - */ - -private[streaming] -class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( - parent: DStream[T], - flatMapFunc: T => Traversable[U] - ) extends DStream[U](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) - } -} - - -/** - * TODO - */ - -private[streaming] -class FilteredDStream[T: ClassManifest]( - parent: DStream[T], - filterFunc: T => Boolean - ) extends DStream[T](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[T]] = { - parent.getOrCompute(validTime).map(_.filter(filterFunc)) - } -} - - -/** - * TODO - */ - -private[streaming] -class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( - parent: DStream[T], - mapPartFunc: Iterator[T] => Iterator[U], - preservePartitioning: Boolean - ) extends DStream[U](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) - } -} - - -/** - * TODO - */ - -private[streaming] -class GlommedDStream[T: ClassManifest](parent: DStream[T]) - extends DStream[Array[T]](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[Array[T]]] = { - parent.getOrCompute(validTime).map(_.glom()) - } -} - - -/** - * TODO - */ - -private[streaming] -class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( - parent: DStream[(K,V)], - createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiner: (C, C) => C, - partitioner: Partitioner - ) extends DStream [(K,C)] (parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[(K,C)]] = { - parent.getOrCompute(validTime) match { - case Some(rdd) => - Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner)) - case None => None - } - } -} - - -/** - * TODO - */ - -private[streaming] -class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( - parent: DStream[(K, V)], - mapValueFunc: V => U - ) extends DStream[(K, U)](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[(K, U)]] = { - parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) - } -} - - -/** - * TODO - */ -private[streaming] -class FlatMapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( - parent: DStream[(K, V)], - flatMapValueFunc: V => TraversableOnce[U] - ) extends DStream[(K, U)](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[(K, U)]] = { - parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) - } -} - - - -/** - * TODO - */ - -class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) - extends DStream[T](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideTime).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideTime: Time = parents.head.slideTime - - override def compute(validTime: Time): Option[RDD[T]] = { - val rdds = new ArrayBuffer[RDD[T]]() - parents.map(_.getOrCompute(validTime)).foreach(_ match { - case Some(rdd) => rdds += rdd - case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) - }) - if (rdds.size > 0) { - Some(new UnionRDD(ssc.sc, rdds)) - } else { - None - } - } -} - - -/** - * TODO - */ - -private[streaming] -class ForEachDStream[T: ClassManifest] ( - parent: DStream[T], - foreachFunc: (RDD[T], Time) => Unit - ) extends DStream[Unit](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[Unit]] = None - - override def generateJob(time: Time): Option[Job] = { - parent.getOrCompute(time) match { - case Some(rdd) => - val jobFunc = () => { - foreachFunc(rdd, time) - } - Some(new Job(time, jobFunc)) - case None => None - } - } -} - - -/** - * TODO - */ - -private[streaming] -class TransformedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], - transformFunc: (RDD[T], Time) => RDD[U] - ) extends DStream[U](parent.ssc) { - - override def dependencies = List(parent) - - override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(transformFunc(_, validTime)) - } -} diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index d0a9ade61d..c72429370e 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -1,5 +1,6 @@ package spark.streaming +import dstream.InputDStream import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import collection.mutable.ArrayBuffer import spark.Logging diff --git a/streaming/src/main/scala/spark/streaming/DataHandler.scala b/streaming/src/main/scala/spark/streaming/DataHandler.scala deleted file mode 100644 index 05f307a8d1..0000000000 --- a/streaming/src/main/scala/spark/streaming/DataHandler.scala +++ /dev/null @@ -1,83 +0,0 @@ -package spark.streaming - -import java.util.concurrent.ArrayBlockingQueue -import scala.collection.mutable.ArrayBuffer -import spark.Logging -import spark.streaming.util.{RecurringTimer, SystemClock} -import spark.storage.StorageLevel - - -/** - * This is a helper object that manages the data received from the socket. It divides - * the object received into small batches of 100s of milliseconds, pushes them as - * blocks into the block manager and reports the block IDs to the network input - * tracker. It starts two threads, one to periodically start a new batch and prepare - * the previous batch of as a block, the other to push the blocks into the block - * manager. - */ - class DataHandler[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel) - extends Serializable with Logging { - - case class Block(id: String, iterator: Iterator[T], metadata: Any = null) - - val clock = new SystemClock() - val blockInterval = 200L - val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) - val blockStorageLevel = storageLevel - val blocksForPushing = new ArrayBlockingQueue[Block](1000) - val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } - - var currentBuffer = new ArrayBuffer[T] - - def createBlock(blockId: String, iterator: Iterator[T]) : Block = { - new Block(blockId, iterator) - } - - def start() { - blockIntervalTimer.start() - blockPushingThread.start() - logInfo("Data handler started") - } - - def stop() { - blockIntervalTimer.stop() - blockPushingThread.interrupt() - logInfo("Data handler stopped") - } - - def += (obj: T) { - currentBuffer += obj - } - - def updateCurrentBuffer(time: Long) { - try { - val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[T] - if (newBlockBuffer.size > 0) { - val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval) - val newBlock = createBlock(blockId, newBlockBuffer.toIterator) - blocksForPushing.add(newBlock) - } - } catch { - case ie: InterruptedException => - logInfo("Block interval timer thread interrupted") - case e: Exception => - receiver.stop() - } - } - - def keepPushingBlocks() { - logInfo("Block pushing thread started") - try { - while(true) { - val block = blocksForPushing.take() - receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel) - } - } catch { - case ie: InterruptedException => - logInfo("Block pushing thread interrupted") - case e: Exception => - receiver.stop() - } - } - } \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala deleted file mode 100644 index 88856364d2..0000000000 --- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala +++ /dev/null @@ -1,109 +0,0 @@ -package spark.streaming - -import spark.RDD -import spark.rdd.UnionRDD - -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} - -import scala.collection.mutable.HashSet - - -class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( - @transient ssc_ : StreamingContext, - directory: String, - filter: PathFilter = FileInputDStream.defaultPathFilter, - newFilesOnly: Boolean = true) - extends InputDStream[(K, V)](ssc_) { - - @transient private var path_ : Path = null - @transient private var fs_ : FileSystem = null - - var lastModTime = 0L - val lastModTimeFiles = new HashSet[String]() - - def path(): Path = { - if (path_ == null) path_ = new Path(directory) - path_ - } - - def fs(): FileSystem = { - if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) - fs_ - } - - override def start() { - if (newFilesOnly) { - lastModTime = System.currentTimeMillis() - } else { - lastModTime = 0 - } - } - - override def stop() { } - - /** - * Finds the files that were modified since the last time this method was called and makes - * a union RDD out of them. Note that this maintains the list of files that were processed - * in the latest modification time in the previous call to this method. This is because the - * modification time returned by the FileStatus API seems to return times only at the - * granularity of seconds. Hence, new files may have the same modification time as the - * latest modification time in the previous call to this method and the list of files - * maintained is used to filter the one that have been processed. - */ - override def compute(validTime: Time): Option[RDD[(K, V)]] = { - // Create the filter for selecting new files - val newFilter = new PathFilter() { - var latestModTime = 0L - val latestModTimeFiles = new HashSet[String]() - - def accept(path: Path): Boolean = { - if (!filter.accept(path)) { - return false - } else { - val modTime = fs.getFileStatus(path).getModificationTime() - if (modTime < lastModTime){ - return false - } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { - return false - } - if (modTime > latestModTime) { - latestModTime = modTime - latestModTimeFiles.clear() - } - latestModTimeFiles += path.toString - return true - } - } - } - - val newFiles = fs.listStatus(path, newFilter) - logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) - if (newFiles.length > 0) { - // Update the modification time and the files processed for that modification time - if (lastModTime != newFilter.latestModTime) { - lastModTime = newFilter.latestModTime - lastModTimeFiles.clear() - } - lastModTimeFiles ++= newFilter.latestModTimeFiles - } - val newRDD = new UnionRDD(ssc.sc, newFiles.map( - file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) - Some(newRDD) - } -} - -object FileInputDStream { - val defaultPathFilter = new PathFilter with Serializable { - def accept(path: Path): Boolean = { - val file = path.getName() - if (file.startsWith(".") || file.endsWith("_tmp")) { - return false - } else { - return true - } - } - } -} - diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala deleted file mode 100644 index 5ac7e5b08e..0000000000 --- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala +++ /dev/null @@ -1,130 +0,0 @@ -package spark.streaming - -import java.io.{ObjectInput, ObjectOutput, Externalizable} -import spark.storage.StorageLevel -import org.apache.flume.source.avro.AvroSourceProtocol -import org.apache.flume.source.avro.AvroFlumeEvent -import org.apache.flume.source.avro.Status -import org.apache.avro.ipc.specific.SpecificResponder -import org.apache.avro.ipc.NettyServer -import java.net.InetSocketAddress -import collection.JavaConversions._ -import spark.Utils -import java.nio.ByteBuffer - -class FlumeInputDStream[T: ClassManifest]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel -) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { - - override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { - new FlumeReceiver(id, host, port, storageLevel) - } -} - -/** - * A wrapper class for AvroFlumeEvent's with a custom serialization format. - * - * This is necessary because AvroFlumeEvent uses inner data structures - * which are not serializable. - */ -class SparkFlumeEvent() extends Externalizable { - var event : AvroFlumeEvent = new AvroFlumeEvent() - - /* De-serialize from bytes. */ - def readExternal(in: ObjectInput) { - val bodyLength = in.readInt() - val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) - - val numHeaders = in.readInt() - val headers = new java.util.HashMap[CharSequence, CharSequence] - - for (i <- 0 until numHeaders) { - val keyLength = in.readInt() - val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) - val key : String = Utils.deserialize(keyBuff) - - val valLength = in.readInt() - val valBuff = new Array[Byte](valLength) - in.read(valBuff) - val value : String = Utils.deserialize(valBuff) - - headers.put(key, value) - } - - event.setBody(ByteBuffer.wrap(bodyBuff)) - event.setHeaders(headers) - } - - /* Serialize to bytes. */ - def writeExternal(out: ObjectOutput) { - val body = event.getBody.array() - out.writeInt(body.length) - out.write(body) - - val numHeaders = event.getHeaders.size() - out.writeInt(numHeaders) - for ((k, v) <- event.getHeaders) { - val keyBuff = Utils.serialize(k.toString) - out.writeInt(keyBuff.length) - out.write(keyBuff) - val valBuff = Utils.serialize(v.toString) - out.writeInt(valBuff.length) - out.write(valBuff) - } - } -} - -private[streaming] object SparkFlumeEvent { - def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { - val event = new SparkFlumeEvent - event.event = in - event - } -} - -/** A simple server that implements Flume's Avro protocol. */ -class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { - override def append(event : AvroFlumeEvent) : Status = { - receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) - Status.OK - } - - override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { - events.foreach (event => - receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)) - Status.OK - } -} - -/** A NetworkReceiver which listens for events using the - * Flume Avro interface.*/ -class FlumeReceiver( - streamId: Int, - host: String, - port: Int, - storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent](streamId) { - - lazy val dataHandler = new DataHandler(this, storageLevel) - - protected override def onStart() { - val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)); - val server = new NettyServer(responder, new InetSocketAddress(host, port)); - dataHandler.start() - server.start() - logInfo("Flume receiver started") - } - - protected override def onStop() { - dataHandler.stop() - logInfo("Flume receiver stopped") - } - - override def getLocationPreference = Some(host) -} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala deleted file mode 100644 index 4bf13dd50c..0000000000 --- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala +++ /dev/null @@ -1,156 +0,0 @@ -package spark.streaming - -import scala.collection.mutable.ArrayBuffer - -import spark.{Logging, SparkEnv, RDD} -import spark.rdd.BlockRDD -import spark.streaming.util.{RecurringTimer, SystemClock} -import spark.storage.StorageLevel - -import java.nio.ByteBuffer - -import akka.actor.{Props, Actor} -import akka.pattern.ask -import akka.dispatch.Await -import akka.util.duration._ - -abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) - extends InputDStream[T](ssc_) { - - // This is an unique identifier that is used to match the network receiver with the - // corresponding network input stream. - val id = ssc.getNewNetworkStreamId() - - /** - * This method creates the receiver object that will be sent to the workers - * to receive data. This method needs to defined by any specific implementation - * of a NetworkInputDStream. - */ - def createReceiver(): NetworkReceiver[T] - - // Nothing to start or stop as both taken care of by the NetworkInputTracker. - def start() {} - - def stop() {} - - override def compute(validTime: Time): Option[RDD[T]] = { - val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) - Some(new BlockRDD[T](ssc.sc, blockIds)) - } -} - - -private[streaming] sealed trait NetworkReceiverMessage -private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage -private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage - -abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging { - - initLogging() - - lazy protected val env = SparkEnv.get - - lazy protected val actor = env.actorSystem.actorOf( - Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId) - - lazy protected val receivingThread = Thread.currentThread() - - /** This method will be called to start receiving data. */ - protected def onStart() - - /** This method will be called to stop receiving data. */ - protected def onStop() - - /** This method conveys a placement preference (hostname) for this receiver. */ - def getLocationPreference() : Option[String] = None - - /** - * This method starts the receiver. First is accesses all the lazy members to - * materialize them. Then it calls the user-defined onStart() method to start - * other threads, etc required to receiver the data. - */ - def start() { - try { - // Access the lazy vals to materialize them - env - actor - receivingThread - - // Call user-defined onStart() - onStart() - } catch { - case ie: InterruptedException => - logInfo("Receiving thread interrupted") - //println("Receiving thread interrupted") - case e: Exception => - stopOnError(e) - } - } - - /** - * This method stops the receiver. First it interrupts the main receiving thread, - * that is, the thread that called receiver.start(). Then it calls the user-defined - * onStop() method to stop other threads and/or do cleanup. - */ - def stop() { - receivingThread.interrupt() - onStop() - //TODO: terminate the actor - } - - /** - * This method stops the receiver and reports to exception to the tracker. - * This should be called whenever an exception has happened on any thread - * of the receiver. - */ - protected def stopOnError(e: Exception) { - logError("Error receiving data", e) - stop() - actor ! ReportError(e.toString) - } - - - /** - * This method pushes a block (as iterator of values) into the block manager. - */ - def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) { - val buffer = new ArrayBuffer[T] ++ iterator - env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level) - - actor ! ReportBlock(blockId, metadata) - } - - /** - * This method pushes a block (as bytes) into the block manager. - */ - def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { - env.blockManager.putBytes(blockId, bytes, level) - actor ! ReportBlock(blockId, metadata) - } - - /** A helper actor that communicates with the NetworkInputTracker */ - private class NetworkReceiverActor extends Actor { - logInfo("Attempting to register with tracker") - val ip = System.getProperty("spark.master.host", "localhost") - val port = System.getProperty("spark.master.port", "7077").toInt - val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) - val tracker = env.actorSystem.actorFor(url) - val timeout = 5.seconds - - override def preStart() { - val future = tracker.ask(RegisterReceiver(streamId, self))(timeout) - Await.result(future, timeout) - } - - override def receive() = { - case ReportBlock(blockId, metadata) => - tracker ! AddBlocks(streamId, Array(blockId), metadata) - case ReportError(msg) => - tracker ! DeregisterReceiver(streamId, msg) - case StopReceiver(msg) => - stop() - tracker ! DeregisterReceiver(streamId, msg) - } - } -} diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index 658498dfc1..a6ab44271f 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -1,5 +1,7 @@ package spark.streaming +import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} +import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} import spark.Logging import spark.SparkEnv diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index f9fef14196..b0a208e67f 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -1,6 +1,9 @@ package spark.streaming import spark.streaming.StreamingContext._ +import spark.streaming.dstream.{ReducedWindowedDStream, StateDStream} +import spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream} +import spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream} import spark.{Manifests, RDD, Partitioner, HashPartitioner} import spark.SparkContext._ @@ -218,13 +221,13 @@ extends Serializable { def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = { - new MapValuesDStream[K, V, U](self, mapValuesFunc) + new MapValuedDStream[K, V, U](self, mapValuesFunc) } def flatMapValues[U: ClassManifest]( flatMapValuesFunc: V => TraversableOnce[U] ): DStream[(K, U)] = { - new FlatMapValuesDStream[K, V, U](self, flatMapValuesFunc) + new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) } def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala deleted file mode 100644 index bb86e51932..0000000000 --- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala +++ /dev/null @@ -1,40 +0,0 @@ -package spark.streaming - -import spark.RDD -import spark.rdd.UnionRDD - -import scala.collection.mutable.Queue -import scala.collection.mutable.ArrayBuffer - -class QueueInputDStream[T: ClassManifest]( - @transient ssc: StreamingContext, - val queue: Queue[RDD[T]], - oneAtATime: Boolean, - defaultRDD: RDD[T] - ) extends InputDStream[T](ssc) { - - override def start() { } - - override def stop() { } - - override def compute(validTime: Time): Option[RDD[T]] = { - val buffer = new ArrayBuffer[RDD[T]]() - if (oneAtATime && queue.size > 0) { - buffer += queue.dequeue() - } else { - buffer ++= queue - } - if (buffer.size > 0) { - if (oneAtATime) { - Some(buffer.first) - } else { - Some(new UnionRDD(ssc.sc, buffer.toSeq)) - } - } else if (defaultRDD != null) { - Some(defaultRDD) - } else { - None - } - } - -} diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala deleted file mode 100644 index 6acaa9aab1..0000000000 --- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala +++ /dev/null @@ -1,85 +0,0 @@ -package spark.streaming - -import java.net.InetSocketAddress -import java.nio.ByteBuffer -import java.nio.channels.{ReadableByteChannel, SocketChannel} -import java.io.EOFException -import java.util.concurrent.ArrayBlockingQueue -import spark._ -import spark.storage.StorageLevel - -/** - * An input stream that reads blocks of serialized objects from a given network address. - * The blocks will be inserted directly into the block store. This is the fastest way to get - * data into Spark Streaming, though it requires the sender to batch data and serialize it - * in the format that the system is configured with. - */ -class RawInputDStream[T: ClassManifest]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_ ) with Logging { - - def createReceiver(): NetworkReceiver[T] = { - new RawNetworkReceiver(id, host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] - } -} - -class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel) - extends NetworkReceiver[Any](streamId) { - - var blockPushingThread: Thread = null - - override def getLocationPreference = None - - def onStart() { - // Open a socket to the target address and keep reading from it - logInfo("Connecting to " + host + ":" + port) - val channel = SocketChannel.open() - channel.configureBlocking(true) - channel.connect(new InetSocketAddress(host, port)) - logInfo("Connected to " + host + ":" + port) - - val queue = new ArrayBlockingQueue[ByteBuffer](2) - - blockPushingThread = new DaemonThread { - override def run() { - var nextBlockNumber = 0 - while (true) { - val buffer = queue.take() - val blockId = "input-" + streamId + "-" + nextBlockNumber - nextBlockNumber += 1 - pushBlock(blockId, buffer, null, storageLevel) - } - } - } - blockPushingThread.start() - - val lengthBuffer = ByteBuffer.allocate(4) - while (true) { - lengthBuffer.clear() - readFully(channel, lengthBuffer) - lengthBuffer.flip() - val length = lengthBuffer.getInt() - val dataBuffer = ByteBuffer.allocate(length) - readFully(channel, dataBuffer) - dataBuffer.flip() - logInfo("Read a block with " + length + " bytes") - queue.put(dataBuffer) - } - } - - def onStop() { - if (blockPushingThread != null) blockPushingThread.interrupt() - } - - /** Read a buffer fully from a given Channel */ - private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) { - while (dest.position < dest.limit) { - if (channel.read(dest) == -1) { - throw new EOFException("End of channel") - } - } - } -} diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala deleted file mode 100644 index f63a9e0011..0000000000 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ /dev/null @@ -1,149 +0,0 @@ -package spark.streaming - -import spark.streaming.StreamingContext._ - -import spark.RDD -import spark.rdd.UnionRDD -import spark.rdd.CoGroupedRDD -import spark.Partitioner -import spark.SparkContext._ -import spark.storage.StorageLevel - -import scala.collection.mutable.ArrayBuffer -import collection.SeqProxy - -class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( - parent: DStream[(K, V)], - reduceFunc: (V, V) => V, - invReduceFunc: (V, V) => V, - _windowTime: Time, - _slideTime: Time, - partitioner: Partitioner - ) extends DStream[(K,V)](parent.ssc) { - - assert(_windowTime.isMultipleOf(parent.slideTime), - "The window duration of ReducedWindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" - ) - - assert(_slideTime.isMultipleOf(parent.slideTime), - "The slide duration of ReducedWindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" - ) - - // Reduce each batch of data using reduceByKey which will be further reduced by window - // by ReducedWindowedDStream - val reducedStream = parent.reduceByKey(reduceFunc, partitioner) - - // Persist RDDs to memory by default as these RDDs are going to be reused. - super.persist(StorageLevel.MEMORY_ONLY_SER) - reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) - - def windowTime: Time = _windowTime - - override def dependencies = List(reducedStream) - - override def slideTime: Time = _slideTime - - override val mustCheckpoint = true - - override def parentRememberDuration: Time = rememberDuration + windowTime - - override def persist(storageLevel: StorageLevel): DStream[(K,V)] = { - super.persist(storageLevel) - reducedStream.persist(storageLevel) - this - } - - override def checkpoint(interval: Time): DStream[(K, V)] = { - super.checkpoint(interval) - //reducedStream.checkpoint(interval) - this - } - - override def compute(validTime: Time): Option[RDD[(K, V)]] = { - val reduceF = reduceFunc - val invReduceF = invReduceFunc - - val currentTime = validTime - val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime) - val previousWindow = currentWindow - slideTime - - logDebug("Window time = " + windowTime) - logDebug("Slide time = " + slideTime) - logDebug("ZeroTime = " + zeroTime) - logDebug("Current window = " + currentWindow) - logDebug("Previous window = " + previousWindow) - - // _____________________________ - // | previous window _________|___________________ - // |___________________| current window | --------------> Time - // |_____________________________| - // - // |________ _________| |________ _________| - // | | - // V V - // old RDDs new RDDs - // - - // Get the RDDs of the reduced values in "old time steps" - val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime) - logDebug("# old RDDs = " + oldRDDs.size) - - // Get the RDDs of the reduced values in "new time steps" - val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime) - logDebug("# new RDDs = " + newRDDs.size) - - // Get the RDD of the reduced value of the previous window - val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) - - // Make the list of RDDs that needs to cogrouped together for reducing their reduced values - val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs - - // Cogroup the reduced RDDs and merge the reduced values - val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) - //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ - - val numOldValues = oldRDDs.size - val numNewValues = newRDDs.size - - val mergeValues = (seqOfValues: Seq[Seq[V]]) => { - if (seqOfValues.size != 1 + numOldValues + numNewValues) { - throw new Exception("Unexpected number of sequences of reduced values") - } - // Getting reduced values "old time steps" that will be removed from current window - val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head) - // Getting reduced values "new time steps" - val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) - if (seqOfValues(0).isEmpty) { - // If previous window's reduce value does not exist, then at least new values should exist - if (newValues.isEmpty) { - throw new Exception("Neither previous window has value for key, nor new values found. " + - "Are you sure your key class hashes consistently?") - } - // Reduce the new values - newValues.reduce(reduceF) // return - } else { - // Get the previous window's reduced value - var tempValue = seqOfValues(0).head - // If old values exists, then inverse reduce then from previous value - if (!oldValues.isEmpty) { - tempValue = invReduceF(tempValue, oldValues.reduce(reduceF)) - } - // If new values exists, then reduce them with previous value - if (!newValues.isEmpty) { - tempValue = reduceF(tempValue, newValues.reduce(reduceF)) - } - tempValue // return - } - } - - val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) - - Some(mergedValuesRDD) - } - - -} - - diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index fd1fa77a24..aeb7c3eb0e 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -4,9 +4,6 @@ import util.{ManualClock, RecurringTimer, Clock} import spark.SparkEnv import spark.Logging -import scala.collection.mutable.HashMap - - private[streaming] class Scheduler(ssc: StreamingContext) extends Logging { diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala deleted file mode 100644 index a9e37c0ff0..0000000000 --- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala +++ /dev/null @@ -1,107 +0,0 @@ -package spark.streaming - -import spark.streaming.util.{RecurringTimer, SystemClock} -import spark.storage.StorageLevel - -import java.io._ -import java.net.Socket -import java.util.concurrent.ArrayBlockingQueue - -import scala.collection.mutable.ArrayBuffer -import scala.Serializable - -class SocketInputDStream[T: ClassManifest]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - bytesToObjects: InputStream => Iterator[T], - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_) { - - def createReceiver(): NetworkReceiver[T] = { - new SocketReceiver(id, host, port, bytesToObjects, storageLevel) - } -} - - -class SocketReceiver[T: ClassManifest]( - streamId: Int, - host: String, - port: Int, - bytesToObjects: InputStream => Iterator[T], - storageLevel: StorageLevel - ) extends NetworkReceiver[T](streamId) { - - lazy protected val dataHandler = new DataHandler(this, storageLevel) - - override def getLocationPreference = None - - protected def onStart() { - logInfo("Connecting to " + host + ":" + port) - val socket = new Socket(host, port) - logInfo("Connected to " + host + ":" + port) - dataHandler.start() - val iterator = bytesToObjects(socket.getInputStream()) - while(iterator.hasNext) { - val obj = iterator.next - dataHandler += obj - } - } - - protected def onStop() { - dataHandler.stop() - } - -} - - -object SocketReceiver { - - /** - * This methods translates the data from an inputstream (say, from a socket) - * to '\n' delimited strings and returns an iterator to access the strings. - */ - def bytesToLines(inputStream: InputStream): Iterator[String] = { - val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) - - val iterator = new Iterator[String] { - var gotNext = false - var finished = false - var nextValue: String = null - - private def getNext() { - try { - nextValue = dataInputStream.readLine() - if (nextValue == null) { - finished = true - } - } - gotNext = true - } - - override def hasNext: Boolean = { - if (!finished) { - if (!gotNext) { - getNext() - if (finished) { - dataInputStream.close() - } - } - } - !finished - } - - override def next(): String = { - if (finished) { - throw new NoSuchElementException("End of stream") - } - if (!gotNext) { - getNext() - } - gotNext = false - nextValue - } - } - iterator - } -} diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala deleted file mode 100644 index b7e4c1c30c..0000000000 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ /dev/null @@ -1,84 +0,0 @@ -package spark.streaming - -import spark.RDD -import spark.rdd.BlockRDD -import spark.Partitioner -import spark.rdd.MapPartitionsRDD -import spark.SparkContext._ -import spark.storage.StorageLevel - -class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( - parent: DStream[(K, V)], - updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], - partitioner: Partitioner, - preservePartitioning: Boolean - ) extends DStream[(K, S)](parent.ssc) { - - super.persist(StorageLevel.MEMORY_ONLY_SER) - - override def dependencies = List(parent) - - override def slideTime = parent.slideTime - - override val mustCheckpoint = true - - override def compute(validTime: Time): Option[RDD[(K, S)]] = { - - // Try to get the previous state RDD - getOrCompute(validTime - slideTime) match { - - case Some(prevStateRDD) => { // If previous state RDD exists - - // Try to get the parent RDD - parent.getOrCompute(validTime) match { - case Some(parentRDD) => { // If parent RDD exists, then compute as usual - - // Define the function for the mapPartition operation on cogrouped RDD; - // first map the cogrouped tuple to tuples of required type, - // and then apply the update function - val updateFuncLocal = updateFunc - val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { - val i = iterator.map(t => { - (t._1, t._2._1, t._2._2.headOption) - }) - updateFuncLocal(i) - } - val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) - val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) - //logDebug("Generating state RDD for time " + validTime) - return Some(stateRDD) - } - case None => { // If parent RDD does not exist, then return old state RDD - return Some(prevStateRDD) - } - } - } - - case None => { // If previous session RDD does not exist (first input data) - - // Try to get the parent RDD - parent.getOrCompute(validTime) match { - case Some(parentRDD) => { // If parent RDD exists, then compute as usual - - // Define the function for the mapPartition operation on grouped RDD; - // first map the grouped tuple to tuples of required type, - // and then apply the update function - val updateFuncLocal = updateFunc - val finalFunc = (iterator: Iterator[(K, Seq[V])]) => { - updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) - } - - val groupedRDD = parentRDD.groupByKey(partitioner) - val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) - //logDebug("Generating state RDD for time " + validTime + " (first)") - return Some(sessionRDD) - } - case None => { // If parent RDD does not exist, then nothing to do! - //logDebug("Not generating state RDD (no previous state, no parent)") - return None - } - } - } - } - } -} diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 998fea849f..ef73049a81 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -1,10 +1,10 @@ package spark.streaming -import spark.RDD -import spark.Logging -import spark.SparkEnv -import spark.SparkContext +import spark.streaming.dstream._ + +import spark.{RDD, Logging, SparkEnv, SparkContext} import spark.storage.StorageLevel +import spark.util.MetadataCleaner import scala.collection.mutable.Queue @@ -18,7 +18,6 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.hadoop.fs.Path import java.util.UUID -import spark.util.MetadataCleaner /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -126,7 +125,7 @@ class StreamingContext private ( /** * Create an input stream that pulls messages form a Kafka Broker. * - * @param host Zookeper hostname. + * @param hostname Zookeper hostname. * @param port Zookeper port. * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed @@ -319,7 +318,7 @@ object StreamingContext { protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { if (prefix == null) { - time.millis.toString + time.milliseconds.toString } else if (suffix == null || suffix.length ==0) { prefix + "-" + time.milliseconds } else { diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 480d292d7c..2976e5e87b 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -1,6 +1,11 @@ package spark.streaming -case class Time(millis: Long) { +/** + * This class is simple wrapper class that represents time in UTC. + * @param millis Time in UTC long + */ + +case class Time(private val millis: Long) { def < (that: Time): Boolean = (this.millis < that.millis) @@ -15,7 +20,9 @@ case class Time(millis: Long) { def - (that: Time): Time = Time(millis - that.millis) def * (times: Int): Time = Time(millis * times) - + + def / (that: Time): Long = millis / that.millis + def floor(that: Time): Time = { val t = that.millis val m = math.floor(this.millis / t).toLong diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala deleted file mode 100644 index e4d2a634f5..0000000000 --- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala +++ /dev/null @@ -1,39 +0,0 @@ -package spark.streaming - -import spark.RDD -import spark.rdd.UnionRDD -import spark.storage.StorageLevel - - -class WindowedDStream[T: ClassManifest]( - parent: DStream[T], - _windowTime: Time, - _slideTime: Time) - extends DStream[T](parent.ssc) { - - if (!_windowTime.isMultipleOf(parent.slideTime)) - throw new Exception("The window duration of WindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") - - if (!_slideTime.isMultipleOf(parent.slideTime)) - throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") - - parent.persist(StorageLevel.MEMORY_ONLY_SER) - - def windowTime: Time = _windowTime - - override def dependencies = List(parent) - - override def slideTime: Time = _slideTime - - override def parentRememberDuration: Time = rememberDuration + windowTime - - override def compute(validTime: Time): Option[RDD[T]] = { - val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime) - Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) - } -} - - - diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala new file mode 100644 index 0000000000..2e427dadf7 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala @@ -0,0 +1,39 @@ +package spark.streaming.dstream + +import spark.{RDD, Partitioner} +import spark.rdd.CoGroupedRDD +import spark.streaming.{Time, DStream} + +class CoGroupedDStream[K : ClassManifest]( + parents: Seq[DStream[(_, _)]], + partitioner: Partitioner + ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { + + if (parents.length == 0) { + throw new IllegalArgumentException("Empty array of parents") + } + + if (parents.map(_.ssc).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different StreamingContexts") + } + + if (parents.map(_.slideTime).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different slide times") + } + + override def dependencies = parents.toList + + override def slideTime = parents.head.slideTime + + override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { + val part = partitioner + val rdds = parents.flatMap(_.getOrCompute(validTime)) + if (rdds.size > 0) { + val q = new CoGroupedRDD[K](rdds, part) + Some(q) + } else { + None + } + } + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala new file mode 100644 index 0000000000..41c3af4694 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala @@ -0,0 +1,19 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.streaming.{Time, StreamingContext} + +/** + * An input stream that always returns the same RDD on each timestep. Useful for testing. + */ +class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T]) + extends InputDStream[T](ssc_) { + + override def start() {} + + override def stop() {} + + override def compute(validTime: Time): Option[RDD[T]] = { + Some(rdd) + } +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala b/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala new file mode 100644 index 0000000000..d737ba1ecc --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala @@ -0,0 +1,83 @@ +package spark.streaming.dstream + +import java.util.concurrent.ArrayBlockingQueue +import scala.collection.mutable.ArrayBuffer +import spark.Logging +import spark.streaming.util.{RecurringTimer, SystemClock} +import spark.storage.StorageLevel + + +/** + * This is a helper object that manages the data received from the socket. It divides + * the object received into small batches of 100s of milliseconds, pushes them as + * blocks into the block manager and reports the block IDs to the network input + * tracker. It starts two threads, one to periodically start a new batch and prepare + * the previous batch of as a block, the other to push the blocks into the block + * manager. + */ + class DataHandler[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel) + extends Serializable with Logging { + + case class Block(id: String, iterator: Iterator[T], metadata: Any = null) + + val clock = new SystemClock() + val blockInterval = 200L + val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) + val blockStorageLevel = storageLevel + val blocksForPushing = new ArrayBlockingQueue[Block](1000) + val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } + + var currentBuffer = new ArrayBuffer[T] + + def createBlock(blockId: String, iterator: Iterator[T]) : Block = { + new Block(blockId, iterator) + } + + def start() { + blockIntervalTimer.start() + blockPushingThread.start() + logInfo("Data handler started") + } + + def stop() { + blockIntervalTimer.stop() + blockPushingThread.interrupt() + logInfo("Data handler stopped") + } + + def += (obj: T) { + currentBuffer += obj + } + + def updateCurrentBuffer(time: Long) { + try { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[T] + if (newBlockBuffer.size > 0) { + val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval) + val newBlock = createBlock(blockId, newBlockBuffer.toIterator) + blocksForPushing.add(newBlock) + } + } catch { + case ie: InterruptedException => + logInfo("Block interval timer thread interrupted") + case e: Exception => + receiver.stop() + } + } + + def keepPushingBlocks() { + logInfo("Block pushing thread started") + try { + while(true) { + val block = blocksForPushing.take() + receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel) + } + } catch { + case ie: InterruptedException => + logInfo("Block pushing thread interrupted") + case e: Exception => + receiver.stop() + } + } + } \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala new file mode 100644 index 0000000000..8cdaff467b --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -0,0 +1,110 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.rdd.UnionRDD +import spark.streaming.{StreamingContext, Time} + +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} + +import scala.collection.mutable.HashSet + + +class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( + @transient ssc_ : StreamingContext, + directory: String, + filter: PathFilter = FileInputDStream.defaultPathFilter, + newFilesOnly: Boolean = true) + extends InputDStream[(K, V)](ssc_) { + + @transient private var path_ : Path = null + @transient private var fs_ : FileSystem = null + + var lastModTime = 0L + val lastModTimeFiles = new HashSet[String]() + + def path(): Path = { + if (path_ == null) path_ = new Path(directory) + path_ + } + + def fs(): FileSystem = { + if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) + fs_ + } + + override def start() { + if (newFilesOnly) { + lastModTime = System.currentTimeMillis() + } else { + lastModTime = 0 + } + } + + override def stop() { } + + /** + * Finds the files that were modified since the last time this method was called and makes + * a union RDD out of them. Note that this maintains the list of files that were processed + * in the latest modification time in the previous call to this method. This is because the + * modification time returned by the FileStatus API seems to return times only at the + * granularity of seconds. Hence, new files may have the same modification time as the + * latest modification time in the previous call to this method and the list of files + * maintained is used to filter the one that have been processed. + */ + override def compute(validTime: Time): Option[RDD[(K, V)]] = { + // Create the filter for selecting new files + val newFilter = new PathFilter() { + var latestModTime = 0L + val latestModTimeFiles = new HashSet[String]() + + def accept(path: Path): Boolean = { + if (!filter.accept(path)) { + return false + } else { + val modTime = fs.getFileStatus(path).getModificationTime() + if (modTime < lastModTime){ + return false + } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { + return false + } + if (modTime > latestModTime) { + latestModTime = modTime + latestModTimeFiles.clear() + } + latestModTimeFiles += path.toString + return true + } + } + } + + val newFiles = fs.listStatus(path, newFilter) + logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) + if (newFiles.length > 0) { + // Update the modification time and the files processed for that modification time + if (lastModTime != newFilter.latestModTime) { + lastModTime = newFilter.latestModTime + lastModTimeFiles.clear() + } + lastModTimeFiles ++= newFilter.latestModTimeFiles + } + val newRDD = new UnionRDD(ssc.sc, newFiles.map( + file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) + Some(newRDD) + } +} + +object FileInputDStream { + val defaultPathFilter = new PathFilter with Serializable { + def accept(path: Path): Boolean = { + val file = path.getName() + if (file.startsWith(".") || file.endsWith("_tmp")) { + return false + } else { + return true + } + } + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala new file mode 100644 index 0000000000..1cbb4d536e --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala @@ -0,0 +1,21 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class FilteredDStream[T: ClassManifest]( + parent: DStream[T], + filterFunc: T => Boolean + ) extends DStream[T](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[T]] = { + parent.getOrCompute(validTime).map(_.filter(filterFunc)) + } +} + + diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala new file mode 100644 index 0000000000..11ed8cf317 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -0,0 +1,20 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD +import spark.SparkContext._ + +private[streaming] +class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( + parent: DStream[(K, V)], + flatMapValueFunc: V => TraversableOnce[U] + ) extends DStream[(K, U)](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[(K, U)]] = { + parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala new file mode 100644 index 0000000000..a13b4c9ff9 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala @@ -0,0 +1,20 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], + flatMapFunc: T => Traversable[U] + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala new file mode 100644 index 0000000000..7e988cadf4 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -0,0 +1,135 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext + +import spark.Utils +import spark.storage.StorageLevel + +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.avro.ipc.NettyServer + +import scala.collection.JavaConversions._ + +import java.net.InetSocketAddress +import java.io.{ObjectInput, ObjectOutput, Externalizable} +import java.nio.ByteBuffer + +class FlumeInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel +) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { + + override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { + new FlumeReceiver(id, host, port, storageLevel) + } +} + +/** + * A wrapper class for AvroFlumeEvent's with a custom serialization format. + * + * This is necessary because AvroFlumeEvent uses inner data structures + * which are not serializable. + */ +class SparkFlumeEvent() extends Externalizable { + var event : AvroFlumeEvent = new AvroFlumeEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput) { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.read(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.read(keyBuff) + val key : String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.read(valBuff) + val value : String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput) { + val body = event.getBody.array() + out.writeInt(body.length) + out.write(body) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} + +private[streaming] object SparkFlumeEvent { + def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { + val event = new SparkFlumeEvent + event.event = in + event + } +} + +/** A simple server that implements Flume's Avro protocol. */ +class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { + override def append(event : AvroFlumeEvent) : Status = { + receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) + Status.OK + } + + override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { + events.foreach (event => + receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)) + Status.OK + } +} + +/** A NetworkReceiver which listens for events using the + * Flume Avro interface.*/ +class FlumeReceiver( + streamId: Int, + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent](streamId) { + + lazy val dataHandler = new DataHandler(this, storageLevel) + + protected override def onStart() { + val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)); + val server = new NettyServer(responder, new InetSocketAddress(host, port)); + dataHandler.start() + server.start() + logInfo("Flume receiver started") + } + + protected override def onStop() { + dataHandler.stop() + logInfo("Flume receiver stopped") + } + + override def getLocationPreference = Some(host) +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala new file mode 100644 index 0000000000..41c629a225 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala @@ -0,0 +1,28 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.streaming.{DStream, Job, Time} + +private[streaming] +class ForEachDStream[T: ClassManifest] ( + parent: DStream[T], + foreachFunc: (RDD[T], Time) => Unit + ) extends DStream[Unit](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[Unit]] = None + + override def generateJob(time: Time): Option[Job] = { + parent.getOrCompute(time) match { + case Some(rdd) => + val jobFunc = () => { + foreachFunc(rdd, time) + } + Some(new Job(time, jobFunc)) + case None => None + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala new file mode 100644 index 0000000000..92ea503cae --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala @@ -0,0 +1,17 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class GlommedDStream[T: ClassManifest](parent: DStream[T]) + extends DStream[Array[T]](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[Array[T]]] = { + parent.getOrCompute(validTime).map(_.glom()) + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala new file mode 100644 index 0000000000..4959c66b06 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala @@ -0,0 +1,19 @@ +package spark.streaming.dstream + +import spark.streaming.{StreamingContext, DStream} + +abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) + extends DStream[T](ssc_) { + + override def dependencies = List() + + override def slideTime = { + if (ssc == null) throw new Exception("ssc is null") + if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") + ssc.graph.batchDuration + } + + def start() + + def stop() +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala new file mode 100644 index 0000000000..a46721af2f --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -0,0 +1,197 @@ +package spark.streaming.dstream + +import spark.Logging +import spark.storage.StorageLevel +import spark.streaming.{Time, DStreamCheckpointData, StreamingContext} + +import java.util.Properties +import java.util.concurrent.Executors + +import kafka.consumer._ +import kafka.message.{Message, MessageSet, MessageAndMetadata} +import kafka.serializer.StringDecoder +import kafka.utils.{Utils, ZKGroupTopicDirs} +import kafka.utils.ZkUtils._ + +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ + + +// Key for a specific Kafka Partition: (broker, topic, group, part) +case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) +// NOT USED - Originally intended for fault-tolerance +// Metadata for a Kafka Stream that it sent to the Master +case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) +// NOT USED - Originally intended for fault-tolerance +// Checkpoint data specific to a KafkaInputDstream +case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], + savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) + +/** + * Input stream that pulls messages from a Kafka Broker. + * + * @param host Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + * @param storageLevel RDD storage level. + */ +class KafkaInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + groupId: String, + topics: Map[String, Int], + initialOffsets: Map[KafkaPartitionKey, Long], + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_ ) with Logging { + + // Metadata that keeps track of which messages have already been consumed. + var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]() + + /* NOT USED - Originally intended for fault-tolerance + + // In case of a failure, the offets for a particular timestamp will be restored. + @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null + + + override protected[streaming] def addMetadata(metadata: Any) { + metadata match { + case x : KafkaInputDStreamMetadata => + savedOffsets(x.timestamp) = x.data + // TOOD: Remove logging + logInfo("New saved Offsets: " + savedOffsets) + case _ => logInfo("Received unknown metadata: " + metadata.toString) + } + } + + override protected[streaming] def updateCheckpointData(currentTime: Time) { + super.updateCheckpointData(currentTime) + if(savedOffsets.size > 0) { + // Find the offets that were stored before the checkpoint was initiated + val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last + val latestOffsets = savedOffsets(key) + logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) + checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) + // TODO: This may throw out offsets that are created after the checkpoint, + // but it's unlikely we'll need them. + savedOffsets.clear() + } + } + + override protected[streaming] def restoreCheckpointData() { + super.restoreCheckpointData() + logInfo("Restoring KafkaDStream checkpoint data.") + checkpointData match { + case x : KafkaDStreamCheckpointData => + restoredOffsets = x.savedOffsets + logInfo("Restored KafkaDStream offsets: " + savedOffsets) + } + } */ + + def createReceiver(): NetworkReceiver[T] = { + new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } +} + +class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, + topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], + storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { + + // Timeout for establishing a connection to Zookeper in ms. + val ZK_TIMEOUT = 10000 + + // Handles pushing data into the BlockManager + lazy protected val dataHandler = new DataHandler(this, storageLevel) + // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset + lazy val offsets = HashMap[KafkaPartitionKey, Long]() + // Connection to Kafka + var consumerConnector : ZookeeperConsumerConnector = null + + def onStop() { + dataHandler.stop() + } + + def onStart() { + + // Starting the DataHandler that buffers blocks and pushes them into them BlockManager + dataHandler.start() + + // In case we are using multiple Threads to handle Kafka Messages + val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) + + val zooKeeperEndPoint = host + ":" + port + logInfo("Starting Kafka Consumer Stream with group: " + groupId) + logInfo("Initial offsets: " + initialOffsets.toString) + + // Zookeper connection properties + val props = new Properties() + props.put("zk.connect", zooKeeperEndPoint) + props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) + props.put("groupid", groupId) + + // Create the connection to the cluster + logInfo("Connecting to Zookeper: " + zooKeeperEndPoint) + val consumerConfig = new ConsumerConfig(props) + consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] + logInfo("Connected to " + zooKeeperEndPoint) + + // Reset the Kafka offsets in case we are recovering from a failure + resetOffsets(initialOffsets) + + // Create Threads for each Topic/Message Stream we are listening + val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) + + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + + } + + // Overwrites the offets in Zookeper. + private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) { + offsets.foreach { case(key, offset) => + val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) + val partitionName = key.brokerId + "-" + key.partId + updatePersistentPath(consumerConnector.zkClient, + topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) + } + } + + // Handles Kafka Messages + private class MessageHandler(stream: KafkaStream[String]) extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + stream.takeWhile { msgAndMetadata => + dataHandler += msgAndMetadata.message + + // Updating the offet. The key is (broker, topic, group, partition). + val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, + groupId, msgAndMetadata.topicInfo.partition.partId) + val offset = msgAndMetadata.topicInfo.getConsumeOffset + offsets.put(key, offset) + // logInfo("Handled message: " + (key, offset).toString) + + // Keep on handling messages + true + } + } + } + + // NOT USED - Originally intended for fault-tolerance + // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) + // extends DataHandler[Any](receiver, storageLevel) { + + // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { + // // Creates a new Block with Kafka-specific Metadata + // new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap)) + // } + + // } + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala new file mode 100644 index 0000000000..daf78c6893 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala @@ -0,0 +1,21 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala new file mode 100644 index 0000000000..689caeef0e --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala @@ -0,0 +1,21 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD +import spark.SparkContext._ + +private[streaming] +class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( + parent: DStream[(K, V)], + mapValueFunc: V => U + ) extends DStream[(K, U)](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[(K, U)]] = { + parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala new file mode 100644 index 0000000000..786b9966f2 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala @@ -0,0 +1,20 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD + +private[streaming] +class MappedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], + mapFunc: T => U + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.map[U](mapFunc)) + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala new file mode 100644 index 0000000000..41276da8bb --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -0,0 +1,157 @@ +package spark.streaming.dstream + +import spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver} + +import spark.{Logging, SparkEnv, RDD} +import spark.rdd.BlockRDD +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer + +import java.nio.ByteBuffer + +import akka.actor.{Props, Actor} +import akka.pattern.ask +import akka.dispatch.Await +import akka.util.duration._ + +abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) + extends InputDStream[T](ssc_) { + + // This is an unique identifier that is used to match the network receiver with the + // corresponding network input stream. + val id = ssc.getNewNetworkStreamId() + + /** + * This method creates the receiver object that will be sent to the workers + * to receive data. This method needs to defined by any specific implementation + * of a NetworkInputDStream. + */ + def createReceiver(): NetworkReceiver[T] + + // Nothing to start or stop as both taken care of by the NetworkInputTracker. + def start() {} + + def stop() {} + + override def compute(validTime: Time): Option[RDD[T]] = { + val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) + Some(new BlockRDD[T](ssc.sc, blockIds)) + } +} + + +private[streaming] sealed trait NetworkReceiverMessage +private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage + +abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging { + + initLogging() + + lazy protected val env = SparkEnv.get + + lazy protected val actor = env.actorSystem.actorOf( + Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId) + + lazy protected val receivingThread = Thread.currentThread() + + /** This method will be called to start receiving data. */ + protected def onStart() + + /** This method will be called to stop receiving data. */ + protected def onStop() + + /** This method conveys a placement preference (hostname) for this receiver. */ + def getLocationPreference() : Option[String] = None + + /** + * This method starts the receiver. First is accesses all the lazy members to + * materialize them. Then it calls the user-defined onStart() method to start + * other threads, etc required to receiver the data. + */ + def start() { + try { + // Access the lazy vals to materialize them + env + actor + receivingThread + + // Call user-defined onStart() + onStart() + } catch { + case ie: InterruptedException => + logInfo("Receiving thread interrupted") + //println("Receiving thread interrupted") + case e: Exception => + stopOnError(e) + } + } + + /** + * This method stops the receiver. First it interrupts the main receiving thread, + * that is, the thread that called receiver.start(). Then it calls the user-defined + * onStop() method to stop other threads and/or do cleanup. + */ + def stop() { + receivingThread.interrupt() + onStop() + //TODO: terminate the actor + } + + /** + * This method stops the receiver and reports to exception to the tracker. + * This should be called whenever an exception has happened on any thread + * of the receiver. + */ + protected def stopOnError(e: Exception) { + logError("Error receiving data", e) + stop() + actor ! ReportError(e.toString) + } + + + /** + * This method pushes a block (as iterator of values) into the block manager. + */ + def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) { + val buffer = new ArrayBuffer[T] ++ iterator + env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level) + + actor ! ReportBlock(blockId, metadata) + } + + /** + * This method pushes a block (as bytes) into the block manager. + */ + def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + env.blockManager.putBytes(blockId, bytes, level) + actor ! ReportBlock(blockId, metadata) + } + + /** A helper actor that communicates with the NetworkInputTracker */ + private class NetworkReceiverActor extends Actor { + logInfo("Attempting to register with tracker") + val ip = System.getProperty("spark.master.host", "localhost") + val port = System.getProperty("spark.master.port", "7077").toInt + val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) + val tracker = env.actorSystem.actorFor(url) + val timeout = 5.seconds + + override def preStart() { + val future = tracker.ask(RegisterReceiver(streamId, self))(timeout) + Await.result(future, timeout) + } + + override def receive() = { + case ReportBlock(blockId, metadata) => + tracker ! AddBlocks(streamId, Array(blockId), metadata) + case ReportError(msg) => + tracker ! DeregisterReceiver(streamId, msg) + case StopReceiver(msg) => + stop() + tracker ! DeregisterReceiver(streamId, msg) + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala new file mode 100644 index 0000000000..024bf3bea4 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala @@ -0,0 +1,41 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.rdd.UnionRDD + +import scala.collection.mutable.Queue +import scala.collection.mutable.ArrayBuffer +import spark.streaming.{Time, StreamingContext} + +class QueueInputDStream[T: ClassManifest]( + @transient ssc: StreamingContext, + val queue: Queue[RDD[T]], + oneAtATime: Boolean, + defaultRDD: RDD[T] + ) extends InputDStream[T](ssc) { + + override def start() { } + + override def stop() { } + + override def compute(validTime: Time): Option[RDD[T]] = { + val buffer = new ArrayBuffer[RDD[T]]() + if (oneAtATime && queue.size > 0) { + buffer += queue.dequeue() + } else { + buffer ++= queue + } + if (buffer.size > 0) { + if (oneAtATime) { + Some(buffer.first) + } else { + Some(new UnionRDD(ssc.sc, buffer.toSeq)) + } + } else if (defaultRDD != null) { + Some(defaultRDD) + } else { + None + } + } + +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala new file mode 100644 index 0000000000..996cc7dea8 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -0,0 +1,88 @@ +package spark.streaming.dstream + +import spark.{DaemonThread, Logging} +import spark.storage.StorageLevel +import spark.streaming.StreamingContext + +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.{ReadableByteChannel, SocketChannel} +import java.io.EOFException +import java.util.concurrent.ArrayBlockingQueue + + +/** + * An input stream that reads blocks of serialized objects from a given network address. + * The blocks will be inserted directly into the block store. This is the fastest way to get + * data into Spark Streaming, though it requires the sender to batch data and serialize it + * in the format that the system is configured with. + */ +class RawInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_ ) with Logging { + + def createReceiver(): NetworkReceiver[T] = { + new RawNetworkReceiver(id, host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] + } +} + +class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel) + extends NetworkReceiver[Any](streamId) { + + var blockPushingThread: Thread = null + + override def getLocationPreference = None + + def onStart() { + // Open a socket to the target address and keep reading from it + logInfo("Connecting to " + host + ":" + port) + val channel = SocketChannel.open() + channel.configureBlocking(true) + channel.connect(new InetSocketAddress(host, port)) + logInfo("Connected to " + host + ":" + port) + + val queue = new ArrayBlockingQueue[ByteBuffer](2) + + blockPushingThread = new DaemonThread { + override def run() { + var nextBlockNumber = 0 + while (true) { + val buffer = queue.take() + val blockId = "input-" + streamId + "-" + nextBlockNumber + nextBlockNumber += 1 + pushBlock(blockId, buffer, null, storageLevel) + } + } + } + blockPushingThread.start() + + val lengthBuffer = ByteBuffer.allocate(4) + while (true) { + lengthBuffer.clear() + readFully(channel, lengthBuffer) + lengthBuffer.flip() + val length = lengthBuffer.getInt() + val dataBuffer = ByteBuffer.allocate(length) + readFully(channel, dataBuffer) + dataBuffer.flip() + logInfo("Read a block with " + length + " bytes") + queue.put(dataBuffer) + } + } + + def onStop() { + if (blockPushingThread != null) blockPushingThread.interrupt() + } + + /** Read a buffer fully from a given Channel */ + private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) { + while (dest.position < dest.limit) { + if (channel.read(dest) == -1) { + throw new EOFException("End of channel") + } + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala new file mode 100644 index 0000000000..2686de14d2 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -0,0 +1,148 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext._ + +import spark.RDD +import spark.rdd.CoGroupedRDD +import spark.Partitioner +import spark.SparkContext._ +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer +import spark.streaming.{Interval, Time, DStream} + +class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( + parent: DStream[(K, V)], + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + _windowTime: Time, + _slideTime: Time, + partitioner: Partitioner + ) extends DStream[(K,V)](parent.ssc) { + + assert(_windowTime.isMultipleOf(parent.slideTime), + "The window duration of ReducedWindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" + ) + + assert(_slideTime.isMultipleOf(parent.slideTime), + "The slide duration of ReducedWindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" + ) + + // Reduce each batch of data using reduceByKey which will be further reduced by window + // by ReducedWindowedDStream + val reducedStream = parent.reduceByKey(reduceFunc, partitioner) + + // Persist RDDs to memory by default as these RDDs are going to be reused. + super.persist(StorageLevel.MEMORY_ONLY_SER) + reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowTime: Time = _windowTime + + override def dependencies = List(reducedStream) + + override def slideTime: Time = _slideTime + + override val mustCheckpoint = true + + override def parentRememberDuration: Time = rememberDuration + windowTime + + override def persist(storageLevel: StorageLevel): DStream[(K,V)] = { + super.persist(storageLevel) + reducedStream.persist(storageLevel) + this + } + + override def checkpoint(interval: Time): DStream[(K, V)] = { + super.checkpoint(interval) + //reducedStream.checkpoint(interval) + this + } + + override def compute(validTime: Time): Option[RDD[(K, V)]] = { + val reduceF = reduceFunc + val invReduceF = invReduceFunc + + val currentTime = validTime + val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime) + val previousWindow = currentWindow - slideTime + + logDebug("Window time = " + windowTime) + logDebug("Slide time = " + slideTime) + logDebug("ZeroTime = " + zeroTime) + logDebug("Current window = " + currentWindow) + logDebug("Previous window = " + previousWindow) + + // _____________________________ + // | previous window _________|___________________ + // |___________________| current window | --------------> Time + // |_____________________________| + // + // |________ _________| |________ _________| + // | | + // V V + // old RDDs new RDDs + // + + // Get the RDDs of the reduced values in "old time steps" + val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime) + logDebug("# old RDDs = " + oldRDDs.size) + + // Get the RDDs of the reduced values in "new time steps" + val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime) + logDebug("# new RDDs = " + newRDDs.size) + + // Get the RDD of the reduced value of the previous window + val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) + + // Make the list of RDDs that needs to cogrouped together for reducing their reduced values + val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs + + // Cogroup the reduced RDDs and merge the reduced values + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) + //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ + + val numOldValues = oldRDDs.size + val numNewValues = newRDDs.size + + val mergeValues = (seqOfValues: Seq[Seq[V]]) => { + if (seqOfValues.size != 1 + numOldValues + numNewValues) { + throw new Exception("Unexpected number of sequences of reduced values") + } + // Getting reduced values "old time steps" that will be removed from current window + val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head) + // Getting reduced values "new time steps" + val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) + if (seqOfValues(0).isEmpty) { + // If previous window's reduce value does not exist, then at least new values should exist + if (newValues.isEmpty) { + throw new Exception("Neither previous window has value for key, nor new values found. " + + "Are you sure your key class hashes consistently?") + } + // Reduce the new values + newValues.reduce(reduceF) // return + } else { + // Get the previous window's reduced value + var tempValue = seqOfValues(0).head + // If old values exists, then inverse reduce then from previous value + if (!oldValues.isEmpty) { + tempValue = invReduceF(tempValue, oldValues.reduce(reduceF)) + } + // If new values exists, then reduce them with previous value + if (!newValues.isEmpty) { + tempValue = reduceF(tempValue, newValues.reduce(reduceF)) + } + tempValue // return + } + } + + val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) + + Some(mergedValuesRDD) + } + + +} + + diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala new file mode 100644 index 0000000000..6854bbe665 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala @@ -0,0 +1,27 @@ +package spark.streaming.dstream + +import spark.{RDD, Partitioner} +import spark.SparkContext._ +import spark.streaming.{DStream, Time} + +private[streaming] +class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( + parent: DStream[(K,V)], + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + partitioner: Partitioner + ) extends DStream [(K,C)] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[(K,C)]] = { + parent.getOrCompute(validTime) match { + case Some(rdd) => + Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner)) + case None => None + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala new file mode 100644 index 0000000000..af5b73ae8d --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -0,0 +1,103 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext +import spark.storage.StorageLevel + +import java.io._ +import java.net.Socket + +class SocketInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + bytesToObjects: InputStream => Iterator[T], + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) { + + def createReceiver(): NetworkReceiver[T] = { + new SocketReceiver(id, host, port, bytesToObjects, storageLevel) + } +} + + +class SocketReceiver[T: ClassManifest]( + streamId: Int, + host: String, + port: Int, + bytesToObjects: InputStream => Iterator[T], + storageLevel: StorageLevel + ) extends NetworkReceiver[T](streamId) { + + lazy protected val dataHandler = new DataHandler(this, storageLevel) + + override def getLocationPreference = None + + protected def onStart() { + logInfo("Connecting to " + host + ":" + port) + val socket = new Socket(host, port) + logInfo("Connected to " + host + ":" + port) + dataHandler.start() + val iterator = bytesToObjects(socket.getInputStream()) + while(iterator.hasNext) { + val obj = iterator.next + dataHandler += obj + } + } + + protected def onStop() { + dataHandler.stop() + } + +} + + +object SocketReceiver { + + /** + * This methods translates the data from an inputstream (say, from a socket) + * to '\n' delimited strings and returns an iterator to access the strings. + */ + def bytesToLines(inputStream: InputStream): Iterator[String] = { + val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) + + val iterator = new Iterator[String] { + var gotNext = false + var finished = false + var nextValue: String = null + + private def getNext() { + try { + nextValue = dataInputStream.readLine() + if (nextValue == null) { + finished = true + } + } + gotNext = true + } + + override def hasNext: Boolean = { + if (!finished) { + if (!gotNext) { + getNext() + if (finished) { + dataInputStream.close() + } + } + } + !finished + } + + override def next(): String = { + if (finished) { + throw new NoSuchElementException("End of stream") + } + if (!gotNext) { + getNext() + } + gotNext = false + nextValue + } + } + iterator + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala new file mode 100644 index 0000000000..6e190b5564 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -0,0 +1,83 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.Partitioner +import spark.SparkContext._ +import spark.storage.StorageLevel +import spark.streaming.{Time, DStream} + +class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( + parent: DStream[(K, V)], + updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], + partitioner: Partitioner, + preservePartitioning: Boolean + ) extends DStream[(K, S)](parent.ssc) { + + super.persist(StorageLevel.MEMORY_ONLY_SER) + + override def dependencies = List(parent) + + override def slideTime = parent.slideTime + + override val mustCheckpoint = true + + override def compute(validTime: Time): Option[RDD[(K, S)]] = { + + // Try to get the previous state RDD + getOrCompute(validTime - slideTime) match { + + case Some(prevStateRDD) => { // If previous state RDD exists + + // Try to get the parent RDD + parent.getOrCompute(validTime) match { + case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on cogrouped RDD; + // first map the cogrouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { + val i = iterator.map(t => { + (t._1, t._2._1, t._2._2.headOption) + }) + updateFuncLocal(i) + } + val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) + val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) + //logDebug("Generating state RDD for time " + validTime) + return Some(stateRDD) + } + case None => { // If parent RDD does not exist, then return old state RDD + return Some(prevStateRDD) + } + } + } + + case None => { // If previous session RDD does not exist (first input data) + + // Try to get the parent RDD + parent.getOrCompute(validTime) match { + case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on grouped RDD; + // first map the grouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, Seq[V])]) => { + updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) + } + + val groupedRDD = parentRDD.groupByKey(partitioner) + val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) + //logDebug("Generating state RDD for time " + validTime + " (first)") + return Some(sessionRDD) + } + case None => { // If parent RDD does not exist, then nothing to do! + //logDebug("Not generating state RDD (no previous state, no parent)") + return None + } + } + } + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala new file mode 100644 index 0000000000..0337579514 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala @@ -0,0 +1,19 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.streaming.{DStream, Time} + +private[streaming] +class TransformedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], + transformFunc: (RDD[T], Time) => RDD[U] + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(transformFunc(_, validTime)) + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala new file mode 100644 index 0000000000..f1efb2ae72 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala @@ -0,0 +1,39 @@ +package spark.streaming.dstream + +import spark.streaming.{DStream, Time} +import spark.RDD +import collection.mutable.ArrayBuffer +import spark.rdd.UnionRDD + +class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) + extends DStream[T](parents.head.ssc) { + + if (parents.length == 0) { + throw new IllegalArgumentException("Empty array of parents") + } + + if (parents.map(_.ssc).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different StreamingContexts") + } + + if (parents.map(_.slideTime).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different slide times") + } + + override def dependencies = parents.toList + + override def slideTime: Time = parents.head.slideTime + + override def compute(validTime: Time): Option[RDD[T]] = { + val rdds = new ArrayBuffer[RDD[T]]() + parents.map(_.getOrCompute(validTime)).foreach(_ match { + case Some(rdd) => rdds += rdd + case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) + }) + if (rdds.size > 0) { + Some(new UnionRDD(ssc.sc, rdds)) + } else { + None + } + } +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala new file mode 100644 index 0000000000..4b2621c497 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala @@ -0,0 +1,40 @@ +package spark.streaming.dstream + +import spark.RDD +import spark.rdd.UnionRDD +import spark.storage.StorageLevel +import spark.streaming.{Interval, Time, DStream} + + +class WindowedDStream[T: ClassManifest]( + parent: DStream[T], + _windowTime: Time, + _slideTime: Time) + extends DStream[T](parent.ssc) { + + if (!_windowTime.isMultipleOf(parent.slideTime)) + throw new Exception("The window duration of WindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + + if (!_slideTime.isMultipleOf(parent.slideTime)) + throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + + parent.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowTime: Time = _windowTime + + override def dependencies = List(parent) + + override def slideTime: Time = _slideTime + + override def parentRememberDuration: Time = rememberDuration + windowTime + + override def compute(validTime: Time): Option[RDD[T]] = { + val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime) + Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) + } +} + + + diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index 7c4ee3b34c..dfaaf03f03 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -25,7 +25,7 @@ object GrepRaw { val rawStreams = (1 to numStreams).map(_ => ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray - val union = new UnionDStream(rawStreams) + val union = ssc.union(rawStreams) union.filter(_.contains("Alice")).count().foreach(r => println("Grep count: " + r.collect().mkString)) ssc.start() diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index 182dfd8a52..338834bc3c 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -34,7 +34,7 @@ object TopKWordCountRaw { val lines = (1 to numStreams).map(_ => { ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) }) - val union = new UnionDStream(lines.toArray) + val union = ssc.union(lines) val counts = union.mapPartitions(splitAndCountPartitions) val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index 9bcd30f4d7..d93335a8ce 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -33,7 +33,7 @@ object WordCountRaw { val lines = (1 to numStreams).map(_ => { ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) }) - val union = new UnionDStream(lines.toArray) + val union = ssc.union(lines) val counts = union.mapPartitions(splitAndCountPartitions) val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) windowedCounts.foreach(r => println("# unique words = " + r.count())) diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala deleted file mode 100644 index 7c642d4802..0000000000 --- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala +++ /dev/null @@ -1,193 +0,0 @@ -package spark.streaming - -import java.util.Properties -import java.util.concurrent.Executors -import kafka.consumer._ -import kafka.message.{Message, MessageSet, MessageAndMetadata} -import kafka.serializer.StringDecoder -import kafka.utils.{Utils, ZKGroupTopicDirs} -import kafka.utils.ZkUtils._ -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ -import spark._ -import spark.RDD -import spark.storage.StorageLevel - -// Key for a specific Kafka Partition: (broker, topic, group, part) -case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) -// NOT USED - Originally intended for fault-tolerance -// Metadata for a Kafka Stream that it sent to the Master -case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) -// NOT USED - Originally intended for fault-tolerance -// Checkpoint data specific to a KafkaInputDstream -case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], - savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) - -/** - * Input stream that pulls messages form a Kafka Broker. - * - * @param host Zookeper hostname. - * @param port Zookeper port. - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. - * @param storageLevel RDD storage level. - */ -class KafkaInputDStream[T: ClassManifest]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - groupId: String, - topics: Map[String, Int], - initialOffsets: Map[KafkaPartitionKey, Long], - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_ ) with Logging { - - // Metadata that keeps track of which messages have already been consumed. - var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]() - - /* NOT USED - Originally intended for fault-tolerance - - // In case of a failure, the offets for a particular timestamp will be restored. - @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null - - - override protected[streaming] def addMetadata(metadata: Any) { - metadata match { - case x : KafkaInputDStreamMetadata => - savedOffsets(x.timestamp) = x.data - // TOOD: Remove logging - logInfo("New saved Offsets: " + savedOffsets) - case _ => logInfo("Received unknown metadata: " + metadata.toString) - } - } - - override protected[streaming] def updateCheckpointData(currentTime: Time) { - super.updateCheckpointData(currentTime) - if(savedOffsets.size > 0) { - // Find the offets that were stored before the checkpoint was initiated - val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last - val latestOffsets = savedOffsets(key) - logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) - checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) - // TODO: This may throw out offsets that are created after the checkpoint, - // but it's unlikely we'll need them. - savedOffsets.clear() - } - } - - override protected[streaming] def restoreCheckpointData() { - super.restoreCheckpointData() - logInfo("Restoring KafkaDStream checkpoint data.") - checkpointData match { - case x : KafkaDStreamCheckpointData => - restoredOffsets = x.savedOffsets - logInfo("Restored KafkaDStream offsets: " + savedOffsets) - } - } */ - - def createReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel) - .asInstanceOf[NetworkReceiver[T]] - } -} - -class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, - topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], - storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { - - // Timeout for establishing a connection to Zookeper in ms. - val ZK_TIMEOUT = 10000 - - // Handles pushing data into the BlockManager - lazy protected val dataHandler = new DataHandler(this, storageLevel) - // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset - lazy val offsets = HashMap[KafkaPartitionKey, Long]() - // Connection to Kafka - var consumerConnector : ZookeeperConsumerConnector = null - - def onStop() { - dataHandler.stop() - } - - def onStart() { - - // Starting the DataHandler that buffers blocks and pushes them into them BlockManager - dataHandler.start() - - // In case we are using multiple Threads to handle Kafka Messages - val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - - val zooKeeperEndPoint = host + ":" + port - logInfo("Starting Kafka Consumer Stream with group: " + groupId) - logInfo("Initial offsets: " + initialOffsets.toString) - - // Zookeper connection properties - val props = new Properties() - props.put("zk.connect", zooKeeperEndPoint) - props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) - props.put("groupid", groupId) - - // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + zooKeeperEndPoint) - val consumerConfig = new ConsumerConfig(props) - consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] - logInfo("Connected to " + zooKeeperEndPoint) - - // Reset the Kafka offsets in case we are recovering from a failure - resetOffsets(initialOffsets) - - // Create Threads for each Topic/Message Stream we are listening - val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) - - // Start the messages handler for each partition - topicMessageStreams.values.foreach { streams => - streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } - } - - } - - // Overwrites the offets in Zookeper. - private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) { - offsets.foreach { case(key, offset) => - val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) - val partitionName = key.brokerId + "-" + key.partId - updatePersistentPath(consumerConnector.zkClient, - topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) - } - } - - // Handles Kafka Messages - private class MessageHandler(stream: KafkaStream[String]) extends Runnable { - def run() { - logInfo("Starting MessageHandler.") - stream.takeWhile { msgAndMetadata => - dataHandler += msgAndMetadata.message - - // Updating the offet. The key is (broker, topic, group, partition). - val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, - groupId, msgAndMetadata.topicInfo.partition.partId) - val offset = msgAndMetadata.topicInfo.getConsumeOffset - offsets.put(key, offset) - // logInfo("Handled message: " + (key, offset).toString) - - // Keep on handling messages - true - } - } - } - - // NOT USED - Originally intended for fault-tolerance - // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) - // extends DataHandler[Any](receiver, storageLevel) { - - // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { - // // Creates a new Block with Kafka-specific Metadata - // new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap)) - // } - - // } - -} diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 0d82b2f1ea..920388bba9 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -42,7 +42,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val stateStreamCheckpointInterval = Seconds(1) // this ensure checkpointing occurs at least once - val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2 + val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2 val secondNumBatches = firstNumBatches // Setup the streams diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index 5b414117fc..4aa428bf64 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -133,7 +133,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter { // Get the output buffer val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] val output = outputStream.output - val waitTime = (batchDuration.millis * (numBatches.toDouble + 0.5)).toLong + val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong val startTime = System.currentTimeMillis() try { diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index ed9a659092..76b528bec3 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -1,5 +1,6 @@ package spark.streaming +import dstream.SparkFlumeEvent import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index a44f738957..28bdd53c3c 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -1,12 +1,16 @@ package spark.streaming +import spark.streaming.dstream.{InputDStream, ForEachDStream} +import spark.streaming.util.ManualClock + import spark.{RDD, Logging} -import util.ManualClock + import collection.mutable.ArrayBuffer -import org.scalatest.FunSuite import collection.mutable.SynchronizedBuffer + import java.io.{ObjectInputStream, IOException} +import org.scalatest.FunSuite /** * This is a input stream just for the testsuites. This is equivalent to a checkpointable, @@ -70,6 +74,10 @@ trait TestSuiteBase extends FunSuite with Logging { def actuallyWait = false + /** + * Set up required DStreams to test the DStream operation using the two sequences + * of input collections. + */ def setupStreams[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V] @@ -90,6 +98,10 @@ trait TestSuiteBase extends FunSuite with Logging { ssc } + /** + * Set up required DStreams to test the binary operation using the sequence + * of input collections. + */ def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], @@ -173,6 +185,11 @@ trait TestSuiteBase extends FunSuite with Logging { output } + /** + * Verify whether the output values after running a DStream operation + * is same as the expected output values, by comparing the output + * collections either as lists (order matters) or sets (order does not matter) + */ def verifyOutput[V: ClassManifest]( output: Seq[Seq[V]], expectedOutput: Seq[Seq[V]], @@ -199,6 +216,10 @@ trait TestSuiteBase extends FunSuite with Logging { logInfo("Output verified successfully") } + /** + * Test unary DStream operation with a list of inputs, with number of + * batches to run same as the number of expected output values + */ def testOperation[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], @@ -208,6 +229,15 @@ trait TestSuiteBase extends FunSuite with Logging { testOperation[U, V](input, operation, expectedOutput, -1, useSet) } + /** + * Test unary DStream operation with a list of inputs + * @param input Sequence of input collections + * @param operation Binary DStream operation to be applied to the 2 inputs + * @param expectedOutput Sequence of expected output collections + * @param numBatches Number of batches to run the operation for + * @param useSet Compare the output values with the expected output values + * as sets (order matters) or as lists (order does not matter) + */ def testOperation[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], @@ -221,6 +251,10 @@ trait TestSuiteBase extends FunSuite with Logging { verifyOutput[V](output, expectedOutput, useSet) } + /** + * Test binary DStream operation with two lists of inputs, with number of + * batches to run same as the number of expected output values + */ def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], @@ -231,6 +265,16 @@ trait TestSuiteBase extends FunSuite with Logging { testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet) } + /** + * Test binary DStream operation with two lists of inputs + * @param input1 First sequence of input collections + * @param input2 Second sequence of input collections + * @param operation Binary DStream operation to be applied to the 2 inputs + * @param expectedOutput Sequence of expected output collections + * @param numBatches Number of batches to run the operation for + * @param useSet Compare the output values with the expected output values + * as sets (order matters) or as lists (order does not matter) + */ def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 3e20e16708..4bc5229465 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -209,7 +209,7 @@ class WindowOperationsSuite extends TestSuiteBase { val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet))) val windowTime = Seconds(2) val slideTime = Seconds(1) - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.groupByKeyAndWindow(windowTime, slideTime) .map(x => (x._1, x._2.toSet)) @@ -223,7 +223,7 @@ class WindowOperationsSuite extends TestSuiteBase { val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0)) val windowTime = Seconds(2) val slideTime = Seconds(1) - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime) testOperation(input, operation, expectedOutput, numBatches, true) } @@ -233,7 +233,7 @@ class WindowOperationsSuite extends TestSuiteBase { val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3))) val windowTime = Seconds(2) val slideTime = Seconds(1) - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt)) } @@ -251,7 +251,7 @@ class WindowOperationsSuite extends TestSuiteBase { slideTime: Time = Seconds(1) ) { test("window - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[Int]) => s.window(windowTime, slideTime) testOperation(input, operation, expectedOutput, numBatches, true) } @@ -265,7 +265,7 @@ class WindowOperationsSuite extends TestSuiteBase { slideTime: Time = Seconds(1) ) { test("reduceByKeyAndWindow - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist() } @@ -281,7 +281,7 @@ class WindowOperationsSuite extends TestSuiteBase { slideTime: Time = Seconds(1) ) { test("reduceByKeyAndWindowInv - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime) .persist() -- cgit v1.2.3 From 18b9b3b99fd753899d19bd10f0dbef7d5c4ae8d7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 30 Dec 2012 20:00:42 -0800 Subject: More classes made private[streaming] to hide from scala docs. --- .gitignore | 2 + .../src/main/scala/spark/streaming/DStream.scala | 2 +- .../main/scala/spark/streaming/JobManager.scala | 2 +- .../src/main/scala/spark/streaming/Scheduler.scala | 2 +- .../scala/spark/streaming/StreamingContext.scala | 108 ++++++++++++++++----- .../src/main/scala/spark/streaming/Time.scala | 30 ++++-- .../spark/streaming/dstream/CoGroupedDStream.scala | 1 + .../spark/streaming/dstream/FileInputDStream.scala | 2 +- .../streaming/dstream/FlumeInputDStream.scala | 13 ++- .../streaming/dstream/KafkaInputDStream.scala | 6 +- .../spark/streaming/dstream/RawInputDStream.scala | 2 + .../streaming/dstream/ReducedWindowedDStream.scala | 1 + .../streaming/dstream/SocketInputDStream.scala | 5 +- .../spark/streaming/dstream/StateDStream.scala | 1 + .../spark/streaming/dstream/UnionDStream.scala | 1 + .../spark/streaming/dstream/WindowedDStream.scala | 2 +- .../main/scala/spark/streaming/util/Clock.scala | 8 +- .../spark/streaming/util/RecurringTimer.scala | 1 + 18 files changed, 137 insertions(+), 52 deletions(-) (limited to 'streaming') diff --git a/.gitignore b/.gitignore index c207409e3c..88d7b56181 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ third_party/libmesos.so third_party/libmesos.dylib conf/java-opts conf/spark-env.sh +conf/streaming-env.sh conf/log4j.properties docs/_site docs/api @@ -31,4 +32,5 @@ project/plugins/src_managed/ logs/ log/ spark-tests.log +streaming-tests.log dependency-reduced-pom.xml diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 292ad3b9f9..beba9cfd4f 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -189,7 +189,7 @@ abstract class DStream[T: ClassManifest] ( val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds logInfo("metadataCleanupDelay = " + metadataCleanerDelay) assert( - metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000, + metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, "It seems you are doing some DStream window operation or setting a checkpoint interval " + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index fda7264a27..3b910538e0 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -14,7 +14,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { try { val timeTaken = job.run() logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( - (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0)) + (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0)) } catch { case e: Exception => logError("Running " + job + " failed", e) diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index aeb7c3eb0e..eb40affe6d 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -22,7 +22,7 @@ class Scheduler(ssc: StreamingContext) extends Logging { val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] - val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_)) + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_)) def start() { // If context was started from checkpoint, then restart timer such that diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index ef73049a81..7256e41af9 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -15,7 +15,6 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat -import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.hadoop.fs.Path import java.util.UUID @@ -101,14 +100,27 @@ class StreamingContext private ( protected[streaming] var receiverJobThread: Thread = null protected[streaming] var scheduler: Scheduler = null + /** + * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of time and releases them for garbage + * collection. This method allows the developer to specify how to long to remember the RDDs ( + * if the developer wishes to query old data outside the DStream computation). + * @param duration Minimum duration that each DStream should remember its RDDs + */ def remember(duration: Time) { graph.remember(duration) } - def checkpoint(dir: String, interval: Time = null) { - if (dir != null) { - sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir)) - checkpointDir = dir + /** + * Sets the context to periodically checkpoint the DStream operations for master + * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + * @param interval checkpoint interval + */ + def checkpoint(directory: String, interval: Time = null) { + if (directory != null) { + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) + checkpointDir = directory checkpointInterval = interval } else { checkpointDir = null @@ -122,9 +134,8 @@ class StreamingContext private ( protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() - /** + /** * Create an input stream that pulls messages form a Kafka Broker. - * * @param hostname Zookeper hostname. * @param port Zookeper port. * @param groupId The group id for this consumer. @@ -147,6 +158,15 @@ class StreamingContext private ( inputStream } + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ def networkTextStream( hostname: String, port: Int, @@ -155,6 +175,16 @@ class StreamingContext private ( networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes it interepreted as object using the given + * converter. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param converter Function to convert the byte stream to objects + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects received (after converting bytes to objects) + */ def networkStream[T: ClassManifest]( hostname: String, port: Int, @@ -166,16 +196,32 @@ class StreamingContext private ( inputStream } + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ def flumeStream ( - hostname: String, - port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = { + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel) graph.addInputStream(inputStream) inputStream } - + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects in the received blocks + */ def rawNetworkStream[T: ClassManifest]( hostname: String, port: Int, @@ -188,7 +234,11 @@ class StreamingContext private ( /** * Creates a input stream that monitors a Hadoop-compatible filesystem - * for new files and executes the necessary processing on them. + * for new files and reads them using the given key-value types and input format. + * @param directory HDFS directory to monitor for new file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file */ def fileStream[ K: ClassManifest, @@ -200,13 +250,23 @@ class StreamingContext private ( inputStream } + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). + * @param directory HDFS directory to monitor for new file + */ def textFileStream(directory: String): DStream[String] = { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } /** * Creates a input stream from an queue of RDDs. In each batch, - * it will process either one or all of the RDDs returned by the queue + * it will process either one or all of the RDDs returned by the queue. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty + * @tparam T Type of objects in the RDD */ def queueStream[T: ClassManifest]( queue: Queue[RDD[T]], @@ -218,13 +278,9 @@ class StreamingContext private ( inputStream } - def queueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = { - val queue = new Queue[RDD[T]] - val inputStream = queueStream(queue, true, null) - queue ++= array - inputStream - } - + /** + * Create a unified DStream from multiple DStreams of the same type and same interval + */ def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { new UnionDStream[T](streams.toArray) } @@ -256,7 +312,7 @@ class StreamingContext private ( } /** - * This function starts the execution of the streams. + * Starts the execution of the streams. */ def start() { if (checkpointDir != null && checkpointInterval == null && graph != null) { @@ -284,7 +340,7 @@ class StreamingContext private ( } /** - * This function stops the execution of the streams. + * Sstops the execution of the streams. */ def stop() { try { @@ -302,6 +358,10 @@ class StreamingContext private ( object StreamingContext { + implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { + new PairDStreamFunctions[K, V](stream) + } + protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = { // Set the default cleaner delay to an hour if not already set. @@ -312,10 +372,6 @@ object StreamingContext { new SparkContext(master, frameworkName) } - implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { - new PairDStreamFunctions[K, V](stream) - } - protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { if (prefix == null) { time.milliseconds.toString diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 2976e5e87b..3c6fd5d967 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -1,16 +1,18 @@ package spark.streaming /** - * This class is simple wrapper class that represents time in UTC. - * @param millis Time in UTC long + * This is a simple class that represents time. Internally, it represents time as UTC. + * The recommended way to create instances of Time is to use helper objects + * [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]]. + * @param millis Time in UTC. */ case class Time(private val millis: Long) { def < (that: Time): Boolean = (this.millis < that.millis) - + def <= (that: Time): Boolean = (this.millis <= that.millis) - + def > (that: Time): Boolean = (this.millis > that.millis) def >= (that: Time): Boolean = (this.millis >= that.millis) @@ -45,23 +47,33 @@ case class Time(private val millis: Long) { def milliseconds: Long = millis } -object Time { +private[streaming] object Time { val zero = Time(0) implicit def toTime(long: Long) = Time(long) - - implicit def toLong(time: Time) = time.milliseconds } +/** + * Helper object that creates instance of [[spark.streaming.Time]] representing + * a given number of milliseconds. + */ object Milliseconds { def apply(milliseconds: Long) = Time(milliseconds) } +/** + * Helper object that creates instance of [[spark.streaming.Time]] representing + * a given number of seconds. + */ object Seconds { def apply(seconds: Long) = Time(seconds * 1000) -} +} -object Minutes { +/** + * Helper object that creates instance of [[spark.streaming.Time]] representing + * a given number of minutes. + */ +object Minutes { def apply(minutes: Long) = Time(minutes * 60000) } diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala index 2e427dadf7..bc23d423d3 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala @@ -4,6 +4,7 @@ import spark.{RDD, Partitioner} import spark.rdd.CoGroupedRDD import spark.streaming.{Time, DStream} +private[streaming] class CoGroupedDStream[K : ClassManifest]( parents: Seq[DStream[(_, _)]], partitioner: Partitioner diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index 8cdaff467b..cf72095324 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -10,7 +10,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import scala.collection.mutable.HashSet - +private[streaming] class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( @transient ssc_ : StreamingContext, directory: String, diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala index 7e988cadf4..ff73225e0f 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -17,6 +17,7 @@ import java.net.InetSocketAddress import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.nio.ByteBuffer +private[streaming] class FlumeInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, host: String, @@ -93,6 +94,7 @@ private[streaming] object SparkFlumeEvent { } /** A simple server that implements Flume's Avro protocol. */ +private[streaming] class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { override def append(event : AvroFlumeEvent) : Status = { receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) @@ -108,12 +110,13 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { /** A NetworkReceiver which listens for events using the * Flume Avro interface.*/ +private[streaming] class FlumeReceiver( - streamId: Int, - host: String, - port: Int, - storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent](streamId) { + streamId: Int, + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent](streamId) { lazy val dataHandler = new DataHandler(this, storageLevel) diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index a46721af2f..175c75bcb9 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -21,10 +21,12 @@ import scala.collection.JavaConversions._ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) // NOT USED - Originally intended for fault-tolerance // Metadata for a Kafka Stream that it sent to the Master +private[streaming] case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) // NOT USED - Originally intended for fault-tolerance // Checkpoint data specific to a KafkaInputDstream -case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], +private[streaming] +case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) /** @@ -39,6 +41,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], * By default the value is pulled from zookeper. * @param storageLevel RDD storage level. */ +private[streaming] class KafkaInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, host: String, @@ -98,6 +101,7 @@ class KafkaInputDStream[T: ClassManifest]( } } +private[streaming] class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala index 996cc7dea8..aa2f31cea8 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -17,6 +17,7 @@ import java.util.concurrent.ArrayBlockingQueue * data into Spark Streaming, though it requires the sender to batch data and serialize it * in the format that the system is configured with. */ +private[streaming] class RawInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, host: String, @@ -29,6 +30,7 @@ class RawInputDStream[T: ClassManifest]( } } +private[streaming] class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala index 2686de14d2..d289ed2a3f 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -11,6 +11,7 @@ import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer import spark.streaming.{Interval, Time, DStream} +private[streaming] class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( parent: DStream[(K, V)], reduceFunc: (V, V) => V, diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index af5b73ae8d..cbe4372299 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -6,6 +6,7 @@ import spark.storage.StorageLevel import java.io._ import java.net.Socket +private[streaming] class SocketInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, host: String, @@ -19,7 +20,7 @@ class SocketInputDStream[T: ClassManifest]( } } - +private[streaming] class SocketReceiver[T: ClassManifest]( streamId: Int, host: String, @@ -50,7 +51,7 @@ class SocketReceiver[T: ClassManifest]( } - +private[streaming] object SocketReceiver { /** diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index 6e190b5564..175b3060c1 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -6,6 +6,7 @@ import spark.SparkContext._ import spark.storage.StorageLevel import spark.streaming.{Time, DStream} +private[streaming] class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala index f1efb2ae72..3bf4c2ecea 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala @@ -5,6 +5,7 @@ import spark.RDD import collection.mutable.ArrayBuffer import spark.rdd.UnionRDD +private[streaming] class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) extends DStream[T](parents.head.ssc) { diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala index 4b2621c497..7718794cbf 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala @@ -5,7 +5,7 @@ import spark.rdd.UnionRDD import spark.storage.StorageLevel import spark.streaming.{Interval, Time, DStream} - +private[streaming] class WindowedDStream[T: ClassManifest]( parent: DStream[T], _windowTime: Time, diff --git a/streaming/src/main/scala/spark/streaming/util/Clock.scala b/streaming/src/main/scala/spark/streaming/util/Clock.scala index ed087e4ea8..974651f9f6 100644 --- a/streaming/src/main/scala/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/spark/streaming/util/Clock.scala @@ -1,13 +1,12 @@ package spark.streaming.util -import spark.streaming._ - -trait Clock { +private[streaming] +trait Clock { def currentTime(): Long def waitTillTime(targetTime: Long): Long } - +private[streaming] class SystemClock() extends Clock { val minPollTime = 25L @@ -54,6 +53,7 @@ class SystemClock() extends Clock { } } +private[streaming] class ManualClock() extends Clock { var time = 0L diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala index dc55fd902b..2e7f4169c9 100644 --- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala @@ -1,5 +1,6 @@ package spark.streaming.util +private[streaming] class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { val minPollTime = 25L -- cgit v1.2.3 From 02497f0cd49a24ebc8b92d3471de250319fe56cd Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 1 Jan 2013 12:21:32 -0800 Subject: Updated Streaming Programming Guide. --- docs/_layouts/global.html | 12 +- docs/api.md | 5 +- docs/configuration.md | 11 ++ docs/streaming-programming-guide.md | 167 +++++++++++++++++++-- .../spark/streaming/util/RecurringTimer.scala | 1 + 5 files changed, 179 insertions(+), 17 deletions(-) (limited to 'streaming') diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index d656b3e3de..a8be52f23e 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -47,11 +47,19 @@
  • Quick Start
  • Scala
  • Java
  • -
  • Spark Streaming (Alpha)
  • +
  • Spark Streaming
  • -
  • API (Scaladoc)
  • +