aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala517
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/DataHandler.scala83
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/Job.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala200
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala37
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala)4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/FileInputDStream.scala)21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala)38
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala17
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala)33
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala)90
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/QueueInputDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/RawInputDStream.scala)11
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala)6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/SocketInputDStream.scala)21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala (renamed from streaming/src/main/scala/spark/streaming/StateDStream.scala)6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/WindowedDStream.scala)5
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStream.scala46
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala75
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala43
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala33
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala69
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/QueueStream.scala39
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala49
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala25
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala25
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala43
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala85
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala84
-rw-r--r--streaming/src/main/scala/spark/streaming/util/Clock.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala2
-rw-r--r--streaming/src/test/resources/log4j.properties13
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala3
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala50
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala12
56 files changed, 855 insertions, 1249 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 770f7b0cc0..11a7232d7b 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -8,6 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
+private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
@@ -30,6 +31,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
/**
* Convenience class to speed up the writing of graph checkpoint to file
*/
+private[streaming]
class CheckpointWriter(checkpointDir: String) extends Logging {
val file = new Path(checkpointDir, "graph")
val conf = new Configuration()
@@ -65,7 +67,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
}
-
+private[streaming]
object CheckpointReader extends Logging {
def read(path: String): Checkpoint = {
@@ -103,6 +105,7 @@ object CheckpointReader extends Logging {
}
}
+private[streaming]
class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) {
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 792c129be8..beba9cfd4f 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -1,17 +1,15 @@
package spark.streaming
+import spark.streaming.dstream._
import StreamingContext._
import Time._
-import spark._
-import spark.SparkContext._
-import spark.rdd._
+import spark.{RDD, Logging}
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import java.util.concurrent.ArrayBlockingQueue
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
@@ -21,7 +19,7 @@ import org.apache.hadoop.conf.Configuration
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
* DStream periodically generates a RDD, either from live data or by transforming the RDD generated
* by a parent DStream.
@@ -38,33 +36,28 @@ import org.apache.hadoop.conf.Configuration
* - A function that is used to generate an RDD after each time interval
*/
-case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-
-abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext)
-extends Serializable with Logging {
+abstract class DStream[T: ClassManifest] (
+ @transient protected[streaming] var ssc: StreamingContext
+ ) extends Serializable with Logging {
initLogging()
- /**
- * ----------------------------------------------
- * Methods that must be implemented by subclasses
- * ----------------------------------------------
- */
+ // =======================================================================
+ // Methods that should be implemented by subclasses of DStream
+ // =======================================================================
- // Time interval at which the DStream generates an RDD
+ /** Time interval after which the DStream generates a RDD */
def slideTime: Time
- // List of parent DStreams on which this DStream depends on
+ /** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
- // Key method that computes RDD for a valid time
+ /** Method that generates a RDD for the given time */
def compute (validTime: Time): Option[RDD[T]]
- /**
- * ---------------------------------------
- * Other general fields and methods of DStream
- * ---------------------------------------
- */
+ // =======================================================================
+ // Methods and fields available on all DStreams
+ // =======================================================================
// RDDs generated, marked as protected[streaming] so that testsuites can access it
@transient
@@ -87,12 +80,15 @@ extends Serializable with Logging {
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
- def isInitialized = (zeroTime != null)
+ protected[streaming] def isInitialized = (zeroTime != null)
// Duration for which the DStream requires its parent DStream to remember each RDD created
- def parentRememberDuration = rememberDuration
+ protected[streaming] def parentRememberDuration = rememberDuration
+
+ /** Returns the StreamingContext associated with this DStream */
+ def context() = ssc
- // Set caching level for the RDDs created by this DStream
+ /** Persists the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
throw new UnsupportedOperationException(
@@ -102,11 +98,16 @@ extends Serializable with Logging {
this
}
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
-
- // Turn on the default caching level for this RDD
+
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): DStream[T] = persist()
+ /**
+ * Enable periodic checkpointing of RDDs of this DStream
+ * @param interval Time interval after which generated RDD will be checkpointed
+ */
def checkpoint(interval: Time): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
@@ -188,13 +189,13 @@ extends Serializable with Logging {
val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
assert(
- metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
+ metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
"delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
"the Java property 'spark.cleaner.delay' to more than " +
- math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
+ math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes."
)
dependencies.foreach(_.validate())
@@ -285,7 +286,7 @@ extends Serializable with Logging {
* Generates a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
- * (eg. PerRDDForEachDStream).
+ * (eg. ForEachDStream).
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@@ -334,20 +335,22 @@ extends Serializable with Logging {
* this method to save custom checkpoint data.
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
-
logInfo("Updating checkpoint data for time " + currentTime)
// Get the checkpointed RDDs from the generated RDDs
- val newRdds = generatedRDDs.filter(_._2.getCheckpointData() != null)
- .map(x => (x._1, x._2.getCheckpointData()))
- // Make a copy of the existing checkpoint data
+ val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
+ .map(x => (x._1, x._2.getCheckpointFile.get))
+
+ // Make a copy of the existing checkpoint data (checkpointed RDDs)
val oldRdds = checkpointData.rdds.clone()
- // If the new checkpoint has checkpoints then replace existing with the new one
+
+ // If the new checkpoint data has checkpoints then replace existing with the new one
if (newRdds.size > 0) {
checkpointData.rdds.clear()
checkpointData.rdds ++= newRdds
}
- // Make dependencies update their checkpoint data
+
+ // Make parent DStreams update their checkpoint data
dependencies.foreach(_.updateCheckpointData(currentTime))
// TODO: remove this, this is just for debugging
@@ -381,9 +384,7 @@ extends Serializable with Logging {
checkpointData.rdds.foreach {
case(time, data) => {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
- val rdd = ssc.sc.objectFile[T](data.toString)
- // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
- rdd.checkpointFile = data.toString
+ val rdd = ssc.sc.checkpointFile[T](data.toString)
generatedRDDs += ((time, rdd))
}
}
@@ -420,65 +421,96 @@ extends Serializable with Logging {
generatedRDDs = new HashMap[Time, RDD[T]] ()
}
- /**
- * --------------
- * DStream operations
- * --------------
- */
+ // =======================================================================
+ // DStream operations
+ // =======================================================================
+
+ /** Returns a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, ssc.sc.clean(mapFunc))
}
+ /**
+ * Returns a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
}
+ /** Returns a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+ /**
+ * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+ * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+ * an array.
+ */
def glom(): DStream[Array[T]] = new GlommedDStream(this)
- def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]): DStream[U] = {
- new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc))
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[U: ClassManifest](
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean = false
+ ): DStream[U] = {
+ new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning)
}
- def reduce(reduceFunc: (T, T) => T): DStream[T] = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing each RDD
+ * of this DStream.
+ */
+ def reduce(reduceFunc: (T, T) => T): DStream[T] =
+ this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting each RDD
+ * of this DStream.
+ */
def count(): DStream[Int] = this.map(_ => 1).reduce(_ + _)
-
- def collect(): DStream[Seq[T]] = this.map(x => (null, x)).groupByKey(1).map(_._2)
- def foreach(foreachFunc: T => Unit) {
- val newStream = new PerElementForEachDStream(this, ssc.sc.clean(foreachFunc))
- ssc.registerOutputStream(newStream)
- newStream
- }
-
- def foreachRDD(foreachFunc: RDD[T] => Unit) {
- foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: RDD[T] => Unit) {
+ foreach((r: RDD[T], t: Time) => foreachFunc(r))
}
- def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: (RDD[T], Time) => Unit) {
+ val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
newStream
}
- def transformRDD[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
- transformRDD((r: RDD[T], t: Time) => transformFunc(r))
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ transform((r: RDD[T], t: Time) => transformFunc(r))
}
- def transformRDD[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
new TransformedDStream(this, ssc.sc.clean(transformFunc))
}
- def toBlockingQueue() = {
- val queue = new ArrayBlockingQueue[RDD[T]](10000)
- this.foreachRDD(rdd => {
- queue.add(rdd)
- })
- queue
- }
-
+ /**
+ * Prints the first ten elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
def print() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.take(11)
@@ -489,18 +521,42 @@ extends Serializable with Logging {
if (first11.size > 10) println("...")
println()
}
- val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
}
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @return
+ */
def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime)
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * @param windowTime duration (i.e., width) of the window;
+ * must be a multiple of this DStream's interval
+ * @param slideTime sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's interval
+ */
def window(windowTime: Time, slideTime: Time): DStream[T] = {
new WindowedDStream(this, windowTime, slideTime)
}
+ /**
+ * Returns a new DStream which computed based on tumbling window on this DStream.
+ * This is equivalent to window(batchTime, batchTime).
+ * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ */
def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
+ */
def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = {
this.window(windowTime, slideTime).reduce(reduceFunc)
}
@@ -516,17 +572,31 @@ extends Serializable with Logging {
.map(_._2)
}
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting the number
+ * of elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).count()
+ */
def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = {
this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime)
}
+ /**
+ * Returns a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ */
def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
- def slice(interval: Interval): Seq[RDD[T]] = {
+ /**
+ * Returns all the RDDs defined by the Interval object (both end times included)
+ */
+ protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = {
slice(interval.beginTime, interval.endTime)
}
- // Get all the RDDs between fromTime to toTime (both included)
+ /**
+ * Returns all the RDDs between 'fromTime' to 'toTime' (both included)
+ */
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
var time = toTime.floor(slideTime)
@@ -540,20 +610,26 @@ extends Serializable with Logging {
rdds.toSeq
}
+ /**
+ * Saves each RDD in this DStream as a Sequence file of serialized objects.
+ */
def saveAsObjectFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreach(saveFunc)
}
+ /**
+ * Saves each RDD in this DStream as at text file, using string representation of elements.
+ */
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreach(saveFunc)
}
def register() {
@@ -561,293 +637,6 @@ extends Serializable with Logging {
}
}
+private[streaming]
+case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
- extends DStream[T](ssc_) {
-
- override def dependencies = List()
-
- override def slideTime = {
- if (ssc == null) throw new Exception("ssc is null")
- if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
- ssc.graph.batchDuration
- }
-
- def start()
-
- def stop()
-}
-
-
-/**
- * TODO
- */
-
-class MappedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- mapFunc: T => U
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.map[U](mapFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- flatMapFunc: T => Traversable[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class FilteredDStream[T: ClassManifest](
- parent: DStream[T],
- filterFunc: T => Boolean
- ) extends DStream[T](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[T]] = {
- parent.getOrCompute(validTime).map(_.filter(filterFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- mapPartFunc: Iterator[T] => Iterator[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class GlommedDStream[T: ClassManifest](parent: DStream[T])
- extends DStream[Array[T]](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Array[T]]] = {
- parent.getOrCompute(validTime).map(_.glom())
- }
-}
-
-
-/**
- * TODO
- */
-
-class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
- parent: DStream[(K,V)],
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiner: (C, C) => C,
- partitioner: Partitioner
- ) extends DStream [(K,C)] (parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K,C)]] = {
- parent.getOrCompute(validTime) match {
- case Some(rdd) =>
- Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
- parent: DStream[(K, V)],
- mapValueFunc: V => U
- ) extends DStream[(K, U)](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K, U)]] = {
- parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class FlatMapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
- parent: DStream[(K, V)],
- flatMapValueFunc: V => TraversableOnce[U]
- ) extends DStream[(K, U)](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K, U)]] = {
- parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
- }
-}
-
-
-
-/**
- * TODO
- */
-
-class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
- extends DStream[T](parents.head.ssc) {
-
- if (parents.length == 0) {
- throw new IllegalArgumentException("Empty array of parents")
- }
-
- if (parents.map(_.ssc).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different StreamingContexts")
- }
-
- if (parents.map(_.slideTime).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different slide times")
- }
-
- override def dependencies = parents.toList
-
- override def slideTime: Time = parents.head.slideTime
-
- override def compute(validTime: Time): Option[RDD[T]] = {
- val rdds = new ArrayBuffer[RDD[T]]()
- parents.map(_.getOrCompute(validTime)).foreach(_ match {
- case Some(rdd) => rdds += rdd
- case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
- })
- if (rdds.size > 0) {
- Some(new UnionRDD(ssc.sc, rdds))
- } else {
- None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class PerElementForEachDStream[T: ClassManifest] (
- parent: DStream[T],
- foreachFunc: T => Unit
- ) extends DStream[Unit](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Unit]] = None
-
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => {
- val sparkJobFunc = {
- (iterator: Iterator[T]) => iterator.foreach(foreachFunc)
- }
- ssc.sc.runJob(rdd, sparkJobFunc)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class PerRDDForEachDStream[T: ClassManifest] (
- parent: DStream[T],
- foreachFunc: (RDD[T], Time) => Unit
- ) extends DStream[Unit](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Unit]] = None
-
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => {
- foreachFunc(rdd, time)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class TransformedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- transformFunc: (RDD[T], Time) => RDD[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(transformFunc(_, validTime))
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index d0a9ade61d..c72429370e 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.InputDStream
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import collection.mutable.ArrayBuffer
import spark.Logging
diff --git a/streaming/src/main/scala/spark/streaming/DataHandler.scala b/streaming/src/main/scala/spark/streaming/DataHandler.scala
deleted file mode 100644
index 05f307a8d1..0000000000
--- a/streaming/src/main/scala/spark/streaming/DataHandler.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package spark.streaming
-
-import java.util.concurrent.ArrayBlockingQueue
-import scala.collection.mutable.ArrayBuffer
-import spark.Logging
-import spark.streaming.util.{RecurringTimer, SystemClock}
-import spark.storage.StorageLevel
-
-
-/**
- * This is a helper object that manages the data received from the socket. It divides
- * the object received into small batches of 100s of milliseconds, pushes them as
- * blocks into the block manager and reports the block IDs to the network input
- * tracker. It starts two threads, one to periodically start a new batch and prepare
- * the previous batch of as a block, the other to push the blocks into the block
- * manager.
- */
- class DataHandler[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel)
- extends Serializable with Logging {
-
- case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
-
- val clock = new SystemClock()
- val blockInterval = 200L
- val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
- val blockStorageLevel = storageLevel
- val blocksForPushing = new ArrayBlockingQueue[Block](1000)
- val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
-
- var currentBuffer = new ArrayBuffer[T]
-
- def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
- new Block(blockId, iterator)
- }
-
- def start() {
- blockIntervalTimer.start()
- blockPushingThread.start()
- logInfo("Data handler started")
- }
-
- def stop() {
- blockIntervalTimer.stop()
- blockPushingThread.interrupt()
- logInfo("Data handler stopped")
- }
-
- def += (obj: T) {
- currentBuffer += obj
- }
-
- def updateCurrentBuffer(time: Long) {
- try {
- val newBlockBuffer = currentBuffer
- currentBuffer = new ArrayBuffer[T]
- if (newBlockBuffer.size > 0) {
- val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval)
- val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
- blocksForPushing.add(newBlock)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Block interval timer thread interrupted")
- case e: Exception =>
- receiver.stop()
- }
- }
-
- def keepPushingBlocks() {
- logInfo("Block pushing thread started")
- try {
- while(true) {
- val block = blocksForPushing.take()
- receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Block pushing thread interrupted")
- case e: Exception =>
- receiver.stop()
- }
- }
- } \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index ffb7725ac9..fa0b7ce19d 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -1,5 +1,6 @@
package spark.streaming
+private[streaming]
case class Interval(beginTime: Time, endTime: Time) {
def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs))
diff --git a/streaming/src/main/scala/spark/streaming/Job.scala b/streaming/src/main/scala/spark/streaming/Job.scala
index 0bcb6fd8dc..67bd8388bc 100644
--- a/streaming/src/main/scala/spark/streaming/Job.scala
+++ b/streaming/src/main/scala/spark/streaming/Job.scala
@@ -2,6 +2,7 @@ package spark.streaming
import java.util.concurrent.atomic.AtomicLong
+private[streaming]
class Job(val time: Time, func: () => _) {
val id = Job.getNewId()
def run(): Long = {
@@ -14,6 +15,7 @@ class Job(val time: Time, func: () => _) {
override def toString = "streaming job " + id + " @ " + time
}
+private[streaming]
object Job {
val id = new AtomicLong(0)
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 9bf9251519..3b910538e0 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -5,6 +5,7 @@ import spark.SparkEnv
import java.util.concurrent.Executors
+private[streaming]
class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
class JobHandler(ssc: StreamingContext, job: Job) extends Runnable {
@@ -13,7 +14,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
try {
val timeTaken = job.run()
logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
- (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
+ (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
logError("Running " + job + " failed", e)
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index b421f795ee..a6ab44271f 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -1,5 +1,7 @@
package spark.streaming
+import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
+import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import spark.Logging
import spark.SparkEnv
@@ -11,10 +13,10 @@ import akka.pattern.ask
import akka.util.duration._
import akka.dispatch._
-trait NetworkInputTrackerMessage
-case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
-case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
-case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
+private[streaming] sealed trait NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
class NetworkInputTracker(
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 720e63bba0..b0a208e67f 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -1,6 +1,9 @@
package spark.streaming
import spark.streaming.StreamingContext._
+import spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
+import spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
+import spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
@@ -218,13 +221,13 @@ extends Serializable {
def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
- new MapValuesDStream[K, V, U](self, mapValuesFunc)
+ new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
def flatMapValues[U: ClassManifest](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
- new FlatMapValuesDStream[K, V, U](self, flatMapValuesFunc)
+ new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
@@ -281,7 +284,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreachRDD(saveFunc)
+ self.foreach(saveFunc)
}
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
@@ -303,7 +306,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreachRDD(saveFunc)
+ self.foreach(saveFunc)
}
private def getKeyClass() = implicitly[ClassManifest[K]].erasure
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 014021be61..eb40affe6d 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -4,14 +4,8 @@ import util.{ManualClock, RecurringTimer, Clock}
import spark.SparkEnv
import spark.Logging
-import scala.collection.mutable.HashMap
-
-
-sealed trait SchedulerMessage
-case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage
-
-class Scheduler(ssc: StreamingContext)
-extends Logging {
+private[streaming]
+class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
@@ -28,7 +22,7 @@ extends Logging {
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
- val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_))
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_))
def start() {
// If context was started from checkpoint, then restart timer such that
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ce47bcb2da..215246ba2e 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,10 +1,10 @@
package spark.streaming
-import spark.RDD
-import spark.Logging
-import spark.SparkEnv
-import spark.SparkContext
+import spark.streaming.dstream._
+
+import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.storage.StorageLevel
+import spark.util.MetadataCleaner
import scala.collection.mutable.Queue
@@ -15,10 +15,8 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path
import java.util.UUID
-import spark.util.MetadataCleaner
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -48,7 +46,7 @@ class StreamingContext private (
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
- * Recreates the StreamingContext from a checkpoint file.
+ * Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
@@ -61,7 +59,7 @@ class StreamingContext private (
"both SparkContext and checkpoint as null")
}
- val isCheckpointPresent = (cp_ != null)
+ protected[streaming] val isCheckpointPresent = (cp_ != null)
val sc: SparkContext = {
if (isCheckpointPresent) {
@@ -71,9 +69,9 @@ class StreamingContext private (
}
}
- val env = SparkEnv.get
+ protected[streaming] val env = SparkEnv.get
- val graph: DStreamGraph = {
+ protected[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
@@ -86,10 +84,10 @@ class StreamingContext private (
}
}
- private[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
- private[streaming] var networkInputTracker: NetworkInputTracker = null
+ protected[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
+ protected[streaming] var networkInputTracker: NetworkInputTracker = null
- private[streaming] var checkpointDir: String = {
+ protected[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
cp_.checkpointDir
@@ -98,18 +96,31 @@ class StreamingContext private (
}
}
- private[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
- private[streaming] var receiverJobThread: Thread = null
- private[streaming] var scheduler: Scheduler = null
+ protected[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
+ protected[streaming] var receiverJobThread: Thread = null
+ protected[streaming] var scheduler: Scheduler = null
+ /**
+ * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * collection. This method allows the developer to specify how to long to remember the RDDs (
+ * if the developer wishes to query old data outside the DStream computation).
+ * @param duration Minimum duration that each DStream should remember its RDDs
+ */
def remember(duration: Time) {
graph.remember(duration)
}
- def checkpoint(dir: String, interval: Time = null) {
- if (dir != null) {
- sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
- checkpointDir = dir
+ /**
+ * Sets the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
+ * @param interval checkpoint interval
+ */
+ def checkpoint(directory: String, interval: Time = null) {
+ if (directory != null) {
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
+ checkpointDir = directory
checkpointInterval = interval
} else {
checkpointDir = null
@@ -117,16 +128,15 @@ class StreamingContext private (
}
}
- private[streaming] def getInitialCheckpoint(): Checkpoint = {
+ protected[streaming] def getInitialCheckpoint(): Checkpoint = {
if (isCheckpointPresent) cp_ else null
}
- private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+ protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
- /**
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
- *
- * @param host Zookeper hostname.
+ * @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
@@ -144,10 +154,19 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
def networkTextStream(
hostname: String,
port: Int,
@@ -156,6 +175,16 @@ class StreamingContext private (
networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes it interepreted as object using the given
+ * converter.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param converter Function to convert the byte stream to objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects received (after converting bytes to objects)
+ */
def networkStream[T: ClassManifest](
hostname: String,
port: Int,
@@ -163,51 +192,103 @@ class StreamingContext private (
storageLevel: StorageLevel
): DStream[T] = {
val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
def flumeStream (
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = {
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
-
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects in the received blocks
+ */
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T] = {
val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
/**
- * This function creates a input stream that monitors a Hadoop-compatible filesystem
- * for new files and executes the necessary processing on them.
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassManifest,
V: ClassManifest,
F <: NewInputFormat[K, V]: ClassManifest
- ](directory: String): DStream[(K, V)] = {
+ ] (directory: String): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * @param directory HDFS directory to monitor for new file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ F <: NewInputFormat[K, V]: ClassManifest
+ ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
+ val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as text files (using key as LongWritable, value
+ * as Text and input format as TextInputFormat). File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ */
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
/**
- * This function create a input stream from an queue of RDDs. In each batch,
- * it will process either one or all of the RDDs returned by the queue
+ * Creates a input stream from an queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
+ * @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
queue: Queue[RDD[T]],
@@ -215,38 +296,33 @@ class StreamingContext private (
defaultRDD: RDD[T] = null
): DStream[T] = {
val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
- graph.addInputStream(inputStream)
- inputStream
- }
-
- def queueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = {
- val queue = new Queue[RDD[T]]
- val inputStream = queueStream(queue, true, null)
- queue ++= array
+ registerInputStream(inputStream)
inputStream
}
+ /**
+ * Create a unified DStream from multiple DStreams of the same type and same interval
+ */
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
/**
- * This function registers a InputDStream as an input stream that will be
- * started (InputDStream.start() called) to get the input data streams.
+ * Registers an input stream that will be started (InputDStream.start() called) to get the
+ * input data.
*/
def registerInputStream(inputStream: InputDStream[_]) {
graph.addInputStream(inputStream)
}
/**
- * This function registers a DStream as an output stream that will be
- * computed every interval.
+ * Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
}
- def validate() {
+ protected def validate() {
assert(graph != null, "Graph is null")
graph.validate()
@@ -258,7 +334,7 @@ class StreamingContext private (
}
/**
- * This function starts the execution of the streams.
+ * Starts the execution of the streams.
*/
def start() {
if (checkpointDir != null && checkpointInterval == null && graph != null) {
@@ -286,7 +362,7 @@ class StreamingContext private (
}
/**
- * This function stops the execution of the streams.
+ * Sstops the execution of the streams.
*/
def stop() {
try {
@@ -304,7 +380,11 @@ class StreamingContext private (
object StreamingContext {
- def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
+ implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
+ new PairDStreamFunctions[K, V](stream)
+ }
+
+ protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
@@ -314,13 +394,9 @@ object StreamingContext {
new SparkContext(master, frameworkName)
}
- implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
- new PairDStreamFunctions[K, V](stream)
- }
-
- def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
+ protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
if (prefix == null) {
- time.millis.toString
+ time.milliseconds.toString
} else if (suffix == null || suffix.length ==0) {
prefix + "-" + time.milliseconds
} else {
@@ -328,7 +404,7 @@ object StreamingContext {
}
}
- def getSparkCheckpointDir(sscCheckpointDir: String): String = {
+ protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 480d292d7c..3c6fd5d967 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -1,11 +1,18 @@
package spark.streaming
-case class Time(millis: Long) {
+/**
+ * This is a simple class that represents time. Internally, it represents time as UTC.
+ * The recommended way to create instances of Time is to use helper objects
+ * [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]].
+ * @param millis Time in UTC.
+ */
+
+case class Time(private val millis: Long) {
def < (that: Time): Boolean = (this.millis < that.millis)
-
+
def <= (that: Time): Boolean = (this.millis <= that.millis)
-
+
def > (that: Time): Boolean = (this.millis > that.millis)
def >= (that: Time): Boolean = (this.millis >= that.millis)
@@ -15,7 +22,9 @@ case class Time(millis: Long) {
def - (that: Time): Time = Time(millis - that.millis)
def * (times: Int): Time = Time(millis * times)
-
+
+ def / (that: Time): Long = millis / that.millis
+
def floor(that: Time): Time = {
val t = that.millis
val m = math.floor(this.millis / t).toLong
@@ -38,23 +47,33 @@ case class Time(millis: Long) {
def milliseconds: Long = millis
}
-object Time {
+private[streaming] object Time {
val zero = Time(0)
implicit def toTime(long: Long) = Time(long)
-
- implicit def toLong(time: Time) = time.milliseconds
}
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of milliseconds.
+ */
object Milliseconds {
def apply(milliseconds: Long) = Time(milliseconds)
}
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of seconds.
+ */
object Seconds {
def apply(seconds: Long) = Time(seconds * 1000)
-}
+}
-object Minutes {
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of minutes.
+ */
+object Minutes {
def apply(minutes: Long) = Time(minutes * 60000)
}
diff --git a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index 61d088eddb..bc23d423d3 100644
--- a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -1,8 +1,10 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.{RDD, Partitioner}
import spark.rdd.CoGroupedRDD
+import spark.streaming.{Time, DStream}
+private[streaming]
class CoGroupedDStream[K : ClassManifest](
parents: Seq[DStream[(_, _)]],
partitioner: Partitioner
diff --git a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala
index 80150708fd..41c3af4694 100644
--- a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala
@@ -1,6 +1,7 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
+import spark.streaming.{Time, StreamingContext}
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index 88856364d2..1e6ad84b44 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -1,7 +1,8 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
+import spark.streaming.{StreamingContext, Time}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
@@ -9,11 +10,11 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import scala.collection.mutable.HashSet
-
+private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@transient ssc_ : StreamingContext,
directory: String,
- filter: PathFilter = FileInputDStream.defaultPathFilter,
+ filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
@@ -59,7 +60,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
- if (!filter.accept(path)) {
+ if (!filter(path)) {
return false
} else {
val modTime = fs.getFileStatus(path).getModificationTime()
@@ -94,16 +95,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
}
+private[streaming]
object FileInputDStream {
- val defaultPathFilter = new PathFilter with Serializable {
- def accept(path: Path): Boolean = {
- val file = path.getName()
- if (file.startsWith(".") || file.endsWith("_tmp")) {
- return false
- } else {
- return true
- }
- }
- }
+ def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
new file mode 100644
index 0000000000..1cbb4d536e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class FilteredDStream[T: ClassManifest](
+ parent: DStream[T],
+ filterFunc: T => Boolean
+ ) extends DStream[T](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ parent.getOrCompute(validTime).map(_.filter(filterFunc))
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
new file mode 100644
index 0000000000..11ed8cf317
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import spark.SparkContext._
+
+private[streaming]
+class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ flatMapValueFunc: V => TraversableOnce[U]
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
new file mode 100644
index 0000000000..a13b4c9ff9
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ flatMapFunc: T => Traversable[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index 2959ce4540..ca70e72e56 100644
--- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -1,17 +1,23 @@
-package spark.streaming
+package spark.streaming.dstream
-import java.io.{ObjectInput, ObjectOutput, Externalizable}
+import spark.streaming.StreamingContext
+
+import spark.Utils
import spark.storage.StorageLevel
+
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
+
+import scala.collection.JavaConversions._
+
import java.net.InetSocketAddress
-import collection.JavaConversions._
-import spark.Utils
+import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
+private[streaming]
class FlumeInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -79,7 +85,7 @@ class SparkFlumeEvent() extends Externalizable {
}
}
-object SparkFlumeEvent {
+private[streaming] object SparkFlumeEvent {
def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
@@ -88,41 +94,43 @@ object SparkFlumeEvent {
}
/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
- receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
Status.OK
}
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
- receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
}
/** A NetworkReceiver which listens for events using the
* Flume Avro interface.*/
+private[streaming]
class FlumeReceiver(
- streamId: Int,
- host: String,
- port: Int,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
+ streamId: Int,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
- lazy val dataHandler = new DataHandler(this, storageLevel)
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
protected override def onStart() {
val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this));
val server = new NettyServer(responder, new InetSocketAddress(host, port));
- dataHandler.start()
+ blockGenerator.start()
server.start()
logInfo("Flume receiver started")
}
protected override def onStop() {
- dataHandler.stop()
+ blockGenerator.stop()
logInfo("Flume receiver stopped")
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
new file mode 100644
index 0000000000..41c629a225
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
@@ -0,0 +1,28 @@
+package spark.streaming.dstream
+
+import spark.RDD
+import spark.streaming.{DStream, Job, Time}
+
+private[streaming]
+class ForEachDStream[T: ClassManifest] (
+ parent: DStream[T],
+ foreachFunc: (RDD[T], Time) => Unit
+ ) extends DStream[Unit](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Unit]] = None
+
+ override def generateJob(time: Time): Option[Job] = {
+ parent.getOrCompute(time) match {
+ case Some(rdd) =>
+ val jobFunc = () => {
+ foreachFunc(rdd, time)
+ }
+ Some(new Job(time, jobFunc))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
new file mode 100644
index 0000000000..92ea503cae
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
@@ -0,0 +1,17 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class GlommedDStream[T: ClassManifest](parent: DStream[T])
+ extends DStream[Array[T]](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Array[T]]] = {
+ parent.getOrCompute(validTime).map(_.glom())
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
new file mode 100644
index 0000000000..4959c66b06
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -0,0 +1,19 @@
+package spark.streaming.dstream
+
+import spark.streaming.{StreamingContext, DStream}
+
+abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
+ extends DStream[T](ssc_) {
+
+ override def dependencies = List()
+
+ override def slideTime = {
+ if (ssc == null) throw new Exception("ssc is null")
+ if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
+ ssc.graph.batchDuration
+ }
+
+ def start()
+
+ def stop()
+}
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 7c642d4802..25988a2ce7 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -1,30 +1,36 @@
-package spark.streaming
+package spark.streaming.dstream
+
+import spark.Logging
+import spark.storage.StorageLevel
+import spark.streaming.{Time, DStreamCheckpointData, StreamingContext}
import java.util.Properties
import java.util.concurrent.Executors
+
import kafka.consumer._
import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.StringDecoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
+
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-import spark._
-import spark.RDD
-import spark.storage.StorageLevel
+
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
// NOT USED - Originally intended for fault-tolerance
// Metadata for a Kafka Stream that it sent to the Master
+private[streaming]
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
// NOT USED - Originally intended for fault-tolerance
// Checkpoint data specific to a KafkaInputDstream
-case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
+private[streaming]
+case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
- * Input stream that pulls messages form a Kafka Broker.
+ * Input stream that pulls messages from a Kafka Broker.
*
* @param host Zookeper hostname.
* @param port Zookeper port.
@@ -35,6 +41,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
* By default the value is pulled from zookeper.
* @param storageLevel RDD storage level.
*/
+private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -94,6 +101,7 @@ class KafkaInputDStream[T: ClassManifest](
}
}
+private[streaming]
class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) {
@@ -102,20 +110,19 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
val ZK_TIMEOUT = 10000
// Handles pushing data into the BlockManager
- lazy protected val dataHandler = new DataHandler(this, storageLevel)
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset
lazy val offsets = HashMap[KafkaPartitionKey, Long]()
// Connection to Kafka
var consumerConnector : ZookeeperConsumerConnector = null
def onStop() {
- dataHandler.stop()
+ blockGenerator.stop()
}
def onStart() {
- // Starting the DataHandler that buffers blocks and pushes them into them BlockManager
- dataHandler.start()
+ blockGenerator.start()
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
@@ -163,8 +170,8 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
- stream.takeWhile { msgAndMetadata =>
- dataHandler += msgAndMetadata.message
+ stream.takeWhile { msgAndMetadata =>
+ blockGenerator += msgAndMetadata.message
// Updating the offet. The key is (broker, topic, group, partition).
val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
@@ -181,7 +188,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
// NOT USED - Originally intended for fault-tolerance
// class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
- // extends DataHandler[Any](receiver, storageLevel) {
+ // extends BufferingBlockCreator[Any](receiver, storageLevel) {
// override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
// // Creates a new Block with Kafka-specific Metadata
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
new file mode 100644
index 0000000000..daf78c6893
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
new file mode 100644
index 0000000000..689caeef0e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import spark.SparkContext._
+
+private[streaming]
+class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ mapValueFunc: V => U
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
new file mode 100644
index 0000000000..786b9966f2
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class MappedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ mapFunc: T => U
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.map[U](mapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 4e4e9fc942..18e62a0e33 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -1,18 +1,21 @@
-package spark.streaming
+package spark.streaming.dstream
-import scala.collection.mutable.ArrayBuffer
+import spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver}
import spark.{Logging, SparkEnv, RDD}
import spark.rdd.BlockRDD
-import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel
+import scala.collection.mutable.ArrayBuffer
+
import java.nio.ByteBuffer
import akka.actor.{Props, Actor}
import akka.pattern.ask
import akka.dispatch.Await
import akka.util.duration._
+import spark.streaming.util.{RecurringTimer, SystemClock}
+import java.util.concurrent.ArrayBlockingQueue
abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
@@ -40,10 +43,10 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
}
-sealed trait NetworkReceiverMessage
-case class StopReceiver(msg: String) extends NetworkReceiverMessage
-case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
-case class ReportError(msg: String) extends NetworkReceiverMessage
+private[streaming] sealed trait NetworkReceiverMessage
+private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
+private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging {
@@ -153,4 +156,77 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
tracker ! DeregisterReceiver(streamId, msg)
}
}
+
+ /**
+ * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
+ * appropriately named blocks at regular intervals. This class starts two threads,
+ * one to periodically start a new batch and prepare the previous batch of as a block,
+ * the other to push the blocks into the block manager.
+ */
+ class BlockGenerator(storageLevel: StorageLevel)
+ extends Serializable with Logging {
+
+ case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
+
+ val clock = new SystemClock()
+ val blockInterval = 200L
+ val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
+ val blockStorageLevel = storageLevel
+ val blocksForPushing = new ArrayBlockingQueue[Block](1000)
+ val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
+
+ var currentBuffer = new ArrayBuffer[T]
+
+ def start() {
+ blockIntervalTimer.start()
+ blockPushingThread.start()
+ logInfo("Data handler started")
+ }
+
+ def stop() {
+ blockIntervalTimer.stop()
+ blockPushingThread.interrupt()
+ logInfo("Data handler stopped")
+ }
+
+ def += (obj: T) {
+ currentBuffer += obj
+ }
+
+ private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
+ new Block(blockId, iterator)
+ }
+
+ private def updateCurrentBuffer(time: Long) {
+ try {
+ val newBlockBuffer = currentBuffer
+ currentBuffer = new ArrayBuffer[T]
+ if (newBlockBuffer.size > 0) {
+ val blockId = "input-" + NetworkReceiver.this.streamId + "- " + (time - blockInterval)
+ val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
+ blocksForPushing.add(newBlock)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block interval timer thread interrupted")
+ case e: Exception =>
+ NetworkReceiver.this.stop()
+ }
+ }
+
+ private def keepPushingBlocks() {
+ logInfo("Block pushing thread started")
+ try {
+ while(true) {
+ val block = blocksForPushing.take()
+ NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block pushing thread interrupted")
+ case e: Exception =>
+ NetworkReceiver.this.stop()
+ }
+ }
+ }
}
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
index bb86e51932..024bf3bea4 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
@@ -1,10 +1,11 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
+import spark.streaming.{Time, StreamingContext}
class QueueInputDStream[T: ClassManifest](
@transient ssc: StreamingContext,
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 6acaa9aab1..aa2f31cea8 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -1,12 +1,15 @@
-package spark.streaming
+package spark.streaming.dstream
+
+import spark.{DaemonThread, Logging}
+import spark.storage.StorageLevel
+import spark.streaming.StreamingContext
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
import java.io.EOFException
import java.util.concurrent.ArrayBlockingQueue
-import spark._
-import spark.storage.StorageLevel
+
/**
* An input stream that reads blocks of serialized objects from a given network address.
@@ -14,6 +17,7 @@ import spark.storage.StorageLevel
* data into Spark Streaming, though it requires the sender to batch data and serialize it
* in the format that the system is configured with.
*/
+private[streaming]
class RawInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -26,6 +30,7 @@ class RawInputDStream[T: ClassManifest](
}
}
+private[streaming]
class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel)
extends NetworkReceiver[Any](streamId) {
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index f63a9e0011..d289ed2a3f 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -1,17 +1,17 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.rdd.UnionRDD
import spark.rdd.CoGroupedRDD
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
-import collection.SeqProxy
+import spark.streaming.{Interval, Time, DStream}
+private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
new file mode 100644
index 0000000000..6854bbe665
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
@@ -0,0 +1,27 @@
+package spark.streaming.dstream
+
+import spark.{RDD, Partitioner}
+import spark.SparkContext._
+import spark.streaming.{DStream, Time}
+
+private[streaming]
+class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+ parent: DStream[(K,V)],
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ partitioner: Partitioner
+ ) extends DStream [(K,C)] (parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K,C)]] = {
+ parent.getOrCompute(validTime) match {
+ case Some(rdd) =>
+ Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index a9e37c0ff0..8e4b20ea4c 100644
--- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -1,15 +1,12 @@
-package spark.streaming
+package spark.streaming.dstream
-import spark.streaming.util.{RecurringTimer, SystemClock}
+import spark.streaming.StreamingContext
import spark.storage.StorageLevel
import java.io._
import java.net.Socket
-import java.util.concurrent.ArrayBlockingQueue
-
-import scala.collection.mutable.ArrayBuffer
-import scala.Serializable
+private[streaming]
class SocketInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -23,7 +20,7 @@ class SocketInputDStream[T: ClassManifest](
}
}
-
+private[streaming]
class SocketReceiver[T: ClassManifest](
streamId: Int,
host: String,
@@ -32,7 +29,7 @@ class SocketReceiver[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkReceiver[T](streamId) {
- lazy protected val dataHandler = new DataHandler(this, storageLevel)
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
override def getLocationPreference = None
@@ -40,21 +37,21 @@ class SocketReceiver[T: ClassManifest](
logInfo("Connecting to " + host + ":" + port)
val socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
- dataHandler.start()
+ blockGenerator.start()
val iterator = bytesToObjects(socket.getInputStream())
while(iterator.hasNext) {
val obj = iterator.next
- dataHandler += obj
+ blockGenerator += obj
}
}
protected def onStop() {
- dataHandler.stop()
+ blockGenerator.stop()
}
}
-
+private[streaming]
object SocketReceiver {
/**
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index b7e4c1c30c..175b3060c1 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -1,12 +1,12 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
-import spark.rdd.BlockRDD
import spark.Partitioner
-import spark.rdd.MapPartitionsRDD
import spark.SparkContext._
import spark.storage.StorageLevel
+import spark.streaming.{Time, DStream}
+private[streaming]
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
new file mode 100644
index 0000000000..0337579514
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
@@ -0,0 +1,19 @@
+package spark.streaming.dstream
+
+import spark.RDD
+import spark.streaming.{DStream, Time}
+
+private[streaming]
+class TransformedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ transformFunc: (RDD[T], Time) => RDD[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(transformFunc(_, validTime))
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
new file mode 100644
index 0000000000..3bf4c2ecea
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
@@ -0,0 +1,40 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import collection.mutable.ArrayBuffer
+import spark.rdd.UnionRDD
+
+private[streaming]
+class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
+ extends DStream[T](parents.head.ssc) {
+
+ if (parents.length == 0) {
+ throw new IllegalArgumentException("Empty array of parents")
+ }
+
+ if (parents.map(_.ssc).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different StreamingContexts")
+ }
+
+ if (parents.map(_.slideTime).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different slide times")
+ }
+
+ override def dependencies = parents.toList
+
+ override def slideTime: Time = parents.head.slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ val rdds = new ArrayBuffer[RDD[T]]()
+ parents.map(_.getOrCompute(validTime)).foreach(_ match {
+ case Some(rdd) => rdds += rdd
+ case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+ })
+ if (rdds.size > 0) {
+ Some(new UnionRDD(ssc.sc, rdds))
+ } else {
+ None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
index e4d2a634f5..7718794cbf 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
@@ -1,10 +1,11 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import spark.storage.StorageLevel
+import spark.streaming.{Interval, Time, DStream}
-
+private[streaming]
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
_windowTime: Time,
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
deleted file mode 100644
index 81938d30d4..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.StreamingContext
-import spark.streaming.StreamingContext._
-import spark.streaming.Seconds
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-
-
-object FileStream {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: FileStream <master> <new HDFS compatible directory>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "FileStream", Seconds(1))
-
- // Create the new directory
- val directory = new Path(args(1))
- val fs = directory.getFileSystem(new Configuration())
- if (fs.exists(directory)) throw new Exception("This directory already exists")
- fs.mkdirs(directory)
- fs.deleteOnExit(directory)
-
- // Create the FileInputDStream on the directory and use the
- // stream to count words in new files created
- val inputStream = ssc.textFileStream(directory.toString)
- val words = inputStream.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
-
- // Creating new files in the directory
- val text = "This is a text file"
- for (i <- 1 to 30) {
- ssc.sc.parallelize((1 to (i * 10)).map(_ => text), 10)
- .saveAsTextFile(new Path(directory, i.toString).toString)
- Thread.sleep(1000)
- }
- Thread.sleep(5000) // Waiting for the file to be processed
- ssc.stop()
- System.exit(0)
- }
-} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
deleted file mode 100644
index b7bc15a1d5..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-
-object FileStreamWithCheckpoint {
-
- def main(args: Array[String]) {
-
- if (args.size != 3) {
- println("FileStreamWithCheckpoint <master> <directory> <checkpoint dir>")
- println("FileStreamWithCheckpoint restart <directory> <checkpoint dir>")
- System.exit(-1)
- }
-
- val directory = new Path(args(1))
- val checkpointDir = args(2)
-
- val ssc: StreamingContext = {
-
- if (args(0) == "restart") {
-
- // Recreated streaming context from specified checkpoint file
- new StreamingContext(checkpointDir)
-
- } else {
-
- // Create directory if it does not exist
- val fs = directory.getFileSystem(new Configuration())
- if (!fs.exists(directory)) fs.mkdirs(directory)
-
- // Create new streaming context
- val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1))
- ssc_.checkpoint(checkpointDir)
-
- // Setup the streaming computation
- val inputStream = ssc_.textFileStream(directory.toString)
- val words = inputStream.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
-
- ssc_
- }
- }
-
- // Start the stream computation
- startFileWritingThread(directory.toString)
- ssc.start()
- }
-
- def startFileWritingThread(directory: String) {
-
- val fs = new Path(directory).getFileSystem(new Configuration())
-
- val fileWritingThread = new Thread() {
- override def run() {
- val r = new scala.util.Random()
- val text = "This is a sample text file with a random number "
- while(true) {
- val number = r.nextInt()
- val file = new Path(directory, number.toString)
- val fos = fs.create(file)
- fos.writeChars(text + number)
- fos.close()
- println("Created text file " + file)
- Thread.sleep(1000)
- }
- }
- }
- fileWritingThread.start()
- }
-
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
deleted file mode 100644
index e60ce483a3..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package spark.streaming.examples
-
-import spark.util.IntParam
-import spark.storage.StorageLevel
-import spark.streaming._
-
-/**
- * Produce a streaming count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: FlumeEventCount <master> <host> <port>
- *
- * <master> is a Spark master URL
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- */
-object FlumeEventCount {
- def main(args: Array[String]) {
- if (args.length != 3) {
- System.err.println(
- "Usage: FlumeEventCount <master> <host> <port>")
- System.exit(1)
- }
-
- val Array(master, host, IntParam(port)) = args
-
- val batchInterval = Milliseconds(2000)
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval)
-
- // Create a flume stream
- val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
-
- // Print out the count of events received from this server in each batch
- stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
deleted file mode 100644
index 6cb2b4c042..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package spark.streaming.examples
-
-import spark.util.IntParam
-import spark.storage.StorageLevel
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-object GrepRaw {
- def main(args: Array[String]) {
- if (args.length != 5) {
- System.err.println("Usage: GrepRaw <master> <numStreams> <host> <port> <batchMillis>")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
-
- // Create the context
- val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- warmUp(ssc.sc)
-
-
- val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
- val union = new UnionDStream(rawStreams)
- union.filter(_.contains("Alice")).count().foreachRDD(r =>
- println("Grep count: " + r.collect().mkString))
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
deleted file mode 100644
index fe55db6e2c..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-package spark.streaming.examples
-
-import java.util.Properties
-import kafka.message.Message
-import kafka.producer.SyncProducerConfig
-import kafka.producer._
-import spark.SparkContext
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.storage.StorageLevel
-import spark.streaming.util.RawTextHelper._
-
-object KafkaWordCount {
- def main(args: Array[String]) {
-
- if (args.length < 6) {
- System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
- System.exit(1)
- }
-
- val Array(master, hostname, port, group, topics, numThreads) = args
-
- val sc = new SparkContext(master, "KafkaWordCount")
- val ssc = new StreamingContext(sc, Seconds(2))
- ssc.checkpoint("checkpoint")
-
- val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
- wordCounts.print()
-
- ssc.start()
- }
-}
-
-// Produces some random words between 1 and 100.
-object KafkaWordCountProducer {
-
- def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
- System.exit(1)
- }
-
- val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
-
- // Zookeper connection properties
- val props = new Properties()
- props.put("zk.connect", hostname + ":" + port)
- props.put("serializer.class", "kafka.serializer.StringEncoder")
-
- val config = new ProducerConfig(props)
- val producer = new Producer[String, String](config)
-
- // Send some messages
- while(true) {
- val messages = (1 to messagesPerSec.toInt).map { messageNum =>
- (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
- }.toArray
- println(messages.mkString(","))
- val data = new ProducerData[String, String](topic, messages)
- producer.send(data)
- Thread.sleep(100)
- }
- }
-
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
deleted file mode 100644
index 2a265d021d..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-package spark.streaming.examples
-
-import spark.RDD
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-
-import scala.collection.mutable.SynchronizedQueue
-
-object QueueStream {
-
- def main(args: Array[String]) {
- if (args.length < 1) {
- System.err.println("Usage: QueueStream <master>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1))
-
- // Create the queue through which RDDs can be pushed to
- // a QueueInputDStream
- val rddQueue = new SynchronizedQueue[RDD[Int]]()
-
- // Create the QueueInputDStream and use it do some processing
- val inputStream = ssc.queueStream(rddQueue)
- val mappedStream = inputStream.map(x => (x % 10, 1))
- val reducedStream = mappedStream.reduceByKey(_ + _)
- reducedStream.print()
- ssc.start()
-
- // Create and push some RDDs into
- for (i <- 1 to 30) {
- rddQueue += ssc.sc.makeRDD(1 to 1000, 10)
- Thread.sleep(1000)
- }
- ssc.stop()
- System.exit(0)
- }
-} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
deleted file mode 100644
index fe4c2bf155..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-package spark.streaming.examples
-
-import spark.storage.StorageLevel
-import spark.util.IntParam
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-import java.util.UUID
-
-object TopKWordCountRaw {
-
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
- val k = 10
-
- // Create the context, and set the checkpoint directory.
- // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
- // periodically to HDFS
- val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- /*warmUp(ssc.sc)*/
-
- // Set up the raw network streams that will connect to localhost:port to raw test
- // senders on the slaves and generate top K words of last 30 seconds
- val lines = (1 to numStreams).map(_ => {
- ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
- })
- val union = new UnionDStream(lines.toArray)
- val counts = union.mapPartitions(splitAndCountPartitions)
- val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
- partialTopKWindowedCounts.foreachRDD(rdd => {
- val collectedCounts = rdd.collect
- println("Collected " + collectedCounts.size + " words from partial top words")
- println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
- })
-
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
deleted file mode 100644
index 867a8f42c4..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-
-object WordCountHdfs {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: WordCountHdfs <master> <directory>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2))
-
- // Create the FileInputDStream on the directory and use the
- // stream to count words in new files created
- val lines = ssc.textFileStream(args(1))
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- }
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
deleted file mode 100644
index eadda60563..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-
-object WordCountNetwork {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1")
- System.exit(1)
- }
-
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
-
- // Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
- val lines = ssc.networkTextStream(args(1), args(2).toInt)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
deleted file mode 100644
index a29c81d437..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package spark.streaming.examples
-
-import spark.storage.StorageLevel
-import spark.util.IntParam
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-import java.util.UUID
-
-object WordCountRaw {
-
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
-
- // Create the context, and set the checkpoint directory.
- // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
- // periodically to HDFS
- val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- warmUp(ssc.sc)
-
- // Set up the raw network streams that will connect to localhost:port to raw test
- // senders on the slaves and generate count of words of last 30 seconds
- val lines = (1 to numStreams).map(_ => {
- ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
- })
- val union = new UnionDStream(lines.toArray)
- val counts = union.mapPartitions(splitAndCountPartitions)
- val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- windowedCounts.foreachRDD(r => println("# unique words = " + r.count()))
-
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
deleted file mode 100644
index 4c6e08bc74..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-package spark.streaming.examples.clickstream
-
-import java.net.{InetAddress,ServerSocket,Socket,SocketException}
-import java.io.{InputStreamReader, BufferedReader, PrintWriter}
-import util.Random
-
-/** Represents a page view on a website with associated dimension data.*/
-class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) {
- override def toString() : String = {
- "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
- }
-}
-object PageView {
- def fromString(in : String) : PageView = {
- val parts = in.split("\t")
- new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
- }
-}
-
-/** Generates streaming events to simulate page views on a website.
- *
- * This should be used in tandem with PageViewStream.scala. Example:
- * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
- * */
-object PageViewGenerator {
- val pages = Map("http://foo.com/" -> .7,
- "http://foo.com/news" -> 0.2,
- "http://foo.com/contact" -> .1)
- val httpStatus = Map(200 -> .95,
- 404 -> .05)
- val userZipCode = Map(94709 -> .5,
- 94117 -> .5)
- val userID = Map((1 to 100).map(_ -> .01):_*)
-
-
- def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
- val rand = new Random().nextDouble()
- var total = 0.0
- for ((item, prob) <- inputMap) {
- total = total + prob
- if (total > rand) {
- return item
- }
- }
- return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
- }
-
- def getNextClickEvent() : String = {
- val id = pickFromDistribution(userID)
- val page = pickFromDistribution(pages)
- val status = pickFromDistribution(httpStatus)
- val zipCode = pickFromDistribution(userZipCode)
- new PageView(page, status, zipCode, id).toString()
- }
-
- def main(args : Array[String]) {
- if (args.length != 2) {
- System.err.println("Usage: PageViewGenerator <port> <viewsPerSecond>")
- System.exit(1)
- }
- val port = args(0).toInt
- val viewsPerSecond = args(1).toFloat
- val sleepDelayMs = (1000.0 / viewsPerSecond).toInt
- val listener = new ServerSocket(port)
- println("Listening on port: " + port)
-
- while (true) {
- val socket = listener.accept()
- new Thread() {
- override def run = {
- println("Got client connected from: " + socket.getInetAddress)
- val out = new PrintWriter(socket.getOutputStream(), true)
-
- while (true) {
- Thread.sleep(sleepDelayMs)
- out.write(getNextClickEvent())
- out.flush()
- }
- socket.close()
- }
- }.start()
- }
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
deleted file mode 100644
index 68be6b7893..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-package spark.streaming.examples.clickstream
-
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-import spark.SparkContext._
-
-/** Analyses a streaming dataset of web page views. This class demonstrates several types of
- * operators available in Spark streaming.
- *
- * This should be used in tandem with PageViewStream.scala. Example:
- * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
- * */
-object PageViewStream {
- def main(args: Array[String]) {
- if (args.length != 3) {
- System.err.println("Usage: PageViewStream <metric> <host> <port>")
- System.err.println("<metric> must be one of pageCounts, slidingPageCounts," +
- " errorRatePerZipCode, activeUserCount, popularUsersSeen")
- System.exit(1)
- }
- val metric = args(0)
- val host = args(1)
- val port = args(2).toInt
-
- // Create the context
- val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
-
- // Create a NetworkInputDStream on target host:port and convert each line to a PageView
- val pageViews = ssc.networkTextStream(host, port)
- .flatMap(_.split("\n"))
- .map(PageView.fromString(_))
-
- // Return a count of views per URL seen in each batch
- val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey()
-
- // Return a sliding window of page views per URL in the last ten seconds
- val slidingPageCounts = pageViews.map(view => ((view.url, 1)))
- .window(Seconds(10), Seconds(2))
- .countByKey()
-
-
- // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
- val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2))
- .map(view => ((view.zipCode, view.status)))
- .groupByKey()
- val errorRatePerZipCode = statusesPerZipCode.map{
- case(zip, statuses) =>
- val normalCount = statuses.filter(_ == 200).size
- val errorCount = statuses.size - normalCount
- val errorRatio = errorCount.toFloat / statuses.size
- if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)}
- else {"%s: %s".format(zip, errorRatio)}
- }
-
- // Return the number unique users in last 15 seconds
- val activeUserCount = pageViews.window(Seconds(15), Seconds(2))
- .map(view => (view.userID, 1))
- .groupByKey()
- .count()
- .map("Unique active users: " + _)
-
- // An external dataset we want to join to this stream
- val userList = ssc.sc.parallelize(
- Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq)
-
- metric match {
- case "pageCounts" => pageCounts.print()
- case "slidingPageCounts" => slidingPageCounts.print()
- case "errorRatePerZipCode" => errorRatePerZipCode.print()
- case "activeUserCount" => activeUserCount.print()
- case "popularUsersSeen" =>
- // Look for users in our existing dataset and print it out if we have a match
- pageViews.map(view => (view.userID, 1))
- .foreachRDD((rdd, time) => rdd.join(userList)
- .map(_._2._2)
- .take(10)
- .foreach(u => println("Saw user %s at time %s".format(u, time))))
- case _ => println("Invalid metric entered: " + metric)
- }
-
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/util/Clock.scala b/streaming/src/main/scala/spark/streaming/util/Clock.scala
index ed087e4ea8..974651f9f6 100644
--- a/streaming/src/main/scala/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/spark/streaming/util/Clock.scala
@@ -1,13 +1,12 @@
package spark.streaming.util
-import spark.streaming._
-
-trait Clock {
+private[streaming]
+trait Clock {
def currentTime(): Long
def waitTillTime(targetTime: Long): Long
}
-
+private[streaming]
class SystemClock() extends Clock {
val minPollTime = 25L
@@ -54,6 +53,7 @@ class SystemClock() extends Clock {
}
}
+private[streaming]
class ManualClock() extends Clock {
var time = 0L
diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
index dc55fd902b..db715cc295 100644
--- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
@@ -1,5 +1,6 @@
package spark.streaming.util
+private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
val minPollTime = 25L
@@ -53,6 +54,7 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
}
+private[streaming]
object RecurringTimer {
def main(args: Array[String]) {
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 02fe16866e..edfa1243fa 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,8 +1,11 @@
-# Set everything to be logged to the console
-log4j.rootCategory=WARN, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 0d82b2f1ea..920388bba9 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -42,7 +42,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
- val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2
+ val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
val secondNumBatches = firstNumBatches
// Setup the streams
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index 5b414117fc..4aa428bf64 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -133,7 +133,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
// Get the output buffer
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
val output = outputStream.output
- val waitTime = (batchDuration.millis * (numBatches.toDouble + 0.5)).toLong
+ val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
val startTime = System.currentTimeMillis()
try {
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index ed9a659092..e71ba6ddc1 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.SparkFlumeEvent
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
@@ -317,7 +318,7 @@ class TestServer(port: Int) extends Logging {
}
}
} catch {
- case e: SocketException => println(e)
+ case e: SocketException => logError("TestServer error", e)
} finally {
logInfo("Connection closed")
if (!clientSocket.isClosed) clientSocket.close()
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 8cc2f8ccfc..28bdd53c3c 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -1,12 +1,16 @@
package spark.streaming
+import spark.streaming.dstream.{InputDStream, ForEachDStream}
+import spark.streaming.util.ManualClock
+
import spark.{RDD, Logging}
-import util.ManualClock
+
import collection.mutable.ArrayBuffer
-import org.scalatest.FunSuite
import collection.mutable.SynchronizedBuffer
+
import java.io.{ObjectInputStream, IOException}
+import org.scalatest.FunSuite
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -35,7 +39,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
- extends PerRDDForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+ extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
}) {
@@ -70,6 +74,10 @@ trait TestSuiteBase extends FunSuite with Logging {
def actuallyWait = false
+ /**
+ * Set up required DStreams to test the DStream operation using the two sequences
+ * of input collections.
+ */
def setupStreams[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V]
@@ -90,6 +98,10 @@ trait TestSuiteBase extends FunSuite with Logging {
ssc
}
+ /**
+ * Set up required DStreams to test the binary operation using the sequence
+ * of input collections.
+ */
def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@@ -173,6 +185,11 @@ trait TestSuiteBase extends FunSuite with Logging {
output
}
+ /**
+ * Verify whether the output values after running a DStream operation
+ * is same as the expected output values, by comparing the output
+ * collections either as lists (order matters) or sets (order does not matter)
+ */
def verifyOutput[V: ClassManifest](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
@@ -199,6 +216,10 @@ trait TestSuiteBase extends FunSuite with Logging {
logInfo("Output verified successfully")
}
+ /**
+ * Test unary DStream operation with a list of inputs, with number of
+ * batches to run same as the number of expected output values
+ */
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -208,6 +229,15 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V](input, operation, expectedOutput, -1, useSet)
}
+ /**
+ * Test unary DStream operation with a list of inputs
+ * @param input Sequence of input collections
+ * @param operation Binary DStream operation to be applied to the 2 inputs
+ * @param expectedOutput Sequence of expected output collections
+ * @param numBatches Number of batches to run the operation for
+ * @param useSet Compare the output values with the expected output values
+ * as sets (order matters) or as lists (order does not matter)
+ */
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -221,6 +251,10 @@ trait TestSuiteBase extends FunSuite with Logging {
verifyOutput[V](output, expectedOutput, useSet)
}
+ /**
+ * Test binary DStream operation with two lists of inputs, with number of
+ * batches to run same as the number of expected output values
+ */
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@@ -231,6 +265,16 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet)
}
+ /**
+ * Test binary DStream operation with two lists of inputs
+ * @param input1 First sequence of input collections
+ * @param input2 Second sequence of input collections
+ * @param operation Binary DStream operation to be applied to the 2 inputs
+ * @param expectedOutput Sequence of expected output collections
+ * @param numBatches Number of batches to run the operation for
+ * @param useSet Compare the output values with the expected output values
+ * as sets (order matters) or as lists (order does not matter)
+ */
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 3e20e16708..4bc5229465 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -209,7 +209,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.groupByKeyAndWindow(windowTime, slideTime)
.map(x => (x._1, x._2.toSet))
@@ -223,7 +223,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -233,7 +233,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt))
}
@@ -251,7 +251,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("window - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.window(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -265,7 +265,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist()
}
@@ -281,7 +281,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindowInv - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime)
.persist()