aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-02 14:08:15 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-02 14:08:15 -0800
commit96a6ff0b09f276fb38656bb753592b1deeff5dd1 (patch)
tree078a96a58c79ef2fd1051d44ba9aee26a7e249a8 /streaming
parent493d65ce651dffc79adcdada0eeeed6452b3cc47 (diff)
parent02497f0cd49a24ebc8b92d3471de250319fe56cd (diff)
downloadspark-96a6ff0b09f276fb38656bb753592b1deeff5dd1.tar.gz
spark-96a6ff0b09f276fb38656bb753592b1deeff5dd1.tar.bz2
spark-96a6ff0b09f276fb38656bb753592b1deeff5dd1.zip
Merge branch 'dev-merge' into datahandler-fix
Conflicts: streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala499
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/Job.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala164
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala37
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala)4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/FileInputDStream.scala)5
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala)28
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala17
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala)20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala)15
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/QueueInputDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/RawInputDStream.scala)11
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala)6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/SocketInputDStream.scala)13
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala (renamed from streaming/src/main/scala/spark/streaming/StateDStream.scala)6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/WindowedDStream.scala)5
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/util/Clock.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala1
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala50
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala12
46 files changed, 721 insertions, 507 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 770f7b0cc0..11a7232d7b 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -8,6 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
+private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
@@ -30,6 +31,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
/**
* Convenience class to speed up the writing of graph checkpoint to file
*/
+private[streaming]
class CheckpointWriter(checkpointDir: String) extends Logging {
val file = new Path(checkpointDir, "graph")
val conf = new Configuration()
@@ -65,7 +67,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
}
-
+private[streaming]
object CheckpointReader extends Logging {
def read(path: String): Checkpoint = {
@@ -103,6 +105,7 @@ object CheckpointReader extends Logging {
}
}
+private[streaming]
class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) {
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index d5048aeed7..beba9cfd4f 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -1,17 +1,15 @@
package spark.streaming
+import spark.streaming.dstream._
import StreamingContext._
import Time._
-import spark._
-import spark.SparkContext._
-import spark.rdd._
+import spark.{RDD, Logging}
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import java.util.concurrent.ArrayBlockingQueue
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
@@ -21,7 +19,7 @@ import org.apache.hadoop.conf.Configuration
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
* DStream periodically generates a RDD, either from live data or by transforming the RDD generated
* by a parent DStream.
@@ -38,33 +36,28 @@ import org.apache.hadoop.conf.Configuration
* - A function that is used to generate an RDD after each time interval
*/
-case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-
-abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext)
-extends Serializable with Logging {
+abstract class DStream[T: ClassManifest] (
+ @transient protected[streaming] var ssc: StreamingContext
+ ) extends Serializable with Logging {
initLogging()
- /**
- * ----------------------------------------------
- * Methods that must be implemented by subclasses
- * ----------------------------------------------
- */
+ // =======================================================================
+ // Methods that should be implemented by subclasses of DStream
+ // =======================================================================
- // Time interval at which the DStream generates an RDD
+ /** Time interval after which the DStream generates a RDD */
def slideTime: Time
- // List of parent DStreams on which this DStream depends on
+ /** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
- // Key method that computes RDD for a valid time
+ /** Method that generates a RDD for the given time */
def compute (validTime: Time): Option[RDD[T]]
- /**
- * ---------------------------------------
- * Other general fields and methods of DStream
- * ---------------------------------------
- */
+ // =======================================================================
+ // Methods and fields available on all DStreams
+ // =======================================================================
// RDDs generated, marked as protected[streaming] so that testsuites can access it
@transient
@@ -87,12 +80,15 @@ extends Serializable with Logging {
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
- def isInitialized = (zeroTime != null)
+ protected[streaming] def isInitialized = (zeroTime != null)
// Duration for which the DStream requires its parent DStream to remember each RDD created
- def parentRememberDuration = rememberDuration
+ protected[streaming] def parentRememberDuration = rememberDuration
+
+ /** Returns the StreamingContext associated with this DStream */
+ def context() = ssc
- // Set caching level for the RDDs created by this DStream
+ /** Persists the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
throw new UnsupportedOperationException(
@@ -102,11 +98,16 @@ extends Serializable with Logging {
this
}
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
-
- // Turn on the default caching level for this RDD
+
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): DStream[T] = persist()
+ /**
+ * Enable periodic checkpointing of RDDs of this DStream
+ * @param interval Time interval after which generated RDD will be checkpointed
+ */
def checkpoint(interval: Time): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
@@ -188,13 +189,13 @@ extends Serializable with Logging {
val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
assert(
- metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
+ metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
"delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
"the Java property 'spark.cleaner.delay' to more than " +
- math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
+ math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes."
)
dependencies.foreach(_.validate())
@@ -285,7 +286,7 @@ extends Serializable with Logging {
* Generates a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
- * (eg. PerRDDForEachDStream).
+ * (eg. ForEachDStream).
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@@ -420,65 +421,96 @@ extends Serializable with Logging {
generatedRDDs = new HashMap[Time, RDD[T]] ()
}
- /**
- * --------------
- * DStream operations
- * --------------
- */
+ // =======================================================================
+ // DStream operations
+ // =======================================================================
+
+ /** Returns a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, ssc.sc.clean(mapFunc))
}
+ /**
+ * Returns a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
}
+ /** Returns a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+ /**
+ * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+ * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+ * an array.
+ */
def glom(): DStream[Array[T]] = new GlommedDStream(this)
- def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]): DStream[U] = {
- new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc))
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[U: ClassManifest](
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean = false
+ ): DStream[U] = {
+ new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning)
}
- def reduce(reduceFunc: (T, T) => T): DStream[T] = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing each RDD
+ * of this DStream.
+ */
+ def reduce(reduceFunc: (T, T) => T): DStream[T] =
+ this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting each RDD
+ * of this DStream.
+ */
def count(): DStream[Int] = this.map(_ => 1).reduce(_ + _)
-
- def collect(): DStream[Seq[T]] = this.map(x => (null, x)).groupByKey(1).map(_._2)
-
- def foreach(foreachFunc: T => Unit) {
- val newStream = new PerElementForEachDStream(this, ssc.sc.clean(foreachFunc))
- ssc.registerOutputStream(newStream)
- newStream
- }
- def foreachRDD(foreachFunc: RDD[T] => Unit) {
- foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: RDD[T] => Unit) {
+ foreach((r: RDD[T], t: Time) => foreachFunc(r))
}
- def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ /**
+ * Applies a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: (RDD[T], Time) => Unit) {
+ val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
newStream
}
- def transformRDD[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
- transformRDD((r: RDD[T], t: Time) => transformFunc(r))
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ transform((r: RDD[T], t: Time) => transformFunc(r))
}
- def transformRDD[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ /**
+ * Returns a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
new TransformedDStream(this, ssc.sc.clean(transformFunc))
}
- def toBlockingQueue() = {
- val queue = new ArrayBlockingQueue[RDD[T]](10000)
- this.foreachRDD(rdd => {
- queue.add(rdd)
- })
- queue
- }
-
+ /**
+ * Prints the first ten elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
def print() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.take(11)
@@ -489,18 +521,42 @@ extends Serializable with Logging {
if (first11.size > 10) println("...")
println()
}
- val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
}
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @return
+ */
def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime)
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * @param windowTime duration (i.e., width) of the window;
+ * must be a multiple of this DStream's interval
+ * @param slideTime sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's interval
+ */
def window(windowTime: Time, slideTime: Time): DStream[T] = {
new WindowedDStream(this, windowTime, slideTime)
}
+ /**
+ * Returns a new DStream which computed based on tumbling window on this DStream.
+ * This is equivalent to window(batchTime, batchTime).
+ * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ */
def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime)
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
+ */
def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = {
this.window(windowTime, slideTime).reduce(reduceFunc)
}
@@ -516,17 +572,31 @@ extends Serializable with Logging {
.map(_._2)
}
+ /**
+ * Returns a new DStream in which each RDD has a single element generated by counting the number
+ * of elements in a window over this DStream. windowTime and slideTime are as defined in the
+ * window() operation. This is equivalent to window(windowTime, slideTime).count()
+ */
def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = {
this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime)
}
+ /**
+ * Returns a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ */
def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
- def slice(interval: Interval): Seq[RDD[T]] = {
+ /**
+ * Returns all the RDDs defined by the Interval object (both end times included)
+ */
+ protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = {
slice(interval.beginTime, interval.endTime)
}
- // Get all the RDDs between fromTime to toTime (both included)
+ /**
+ * Returns all the RDDs between 'fromTime' to 'toTime' (both included)
+ */
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
var time = toTime.floor(slideTime)
@@ -540,20 +610,26 @@ extends Serializable with Logging {
rdds.toSeq
}
+ /**
+ * Saves each RDD in this DStream as a Sequence file of serialized objects.
+ */
def saveAsObjectFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreach(saveFunc)
}
+ /**
+ * Saves each RDD in this DStream as at text file, using string representation of elements.
+ */
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreach(saveFunc)
}
def register() {
@@ -561,293 +637,6 @@ extends Serializable with Logging {
}
}
+private[streaming]
+case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
- extends DStream[T](ssc_) {
-
- override def dependencies = List()
-
- override def slideTime = {
- if (ssc == null) throw new Exception("ssc is null")
- if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
- ssc.graph.batchDuration
- }
-
- def start()
-
- def stop()
-}
-
-
-/**
- * TODO
- */
-
-class MappedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- mapFunc: T => U
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.map[U](mapFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- flatMapFunc: T => Traversable[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class FilteredDStream[T: ClassManifest](
- parent: DStream[T],
- filterFunc: T => Boolean
- ) extends DStream[T](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[T]] = {
- parent.getOrCompute(validTime).map(_.filter(filterFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- mapPartFunc: Iterator[T] => Iterator[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class GlommedDStream[T: ClassManifest](parent: DStream[T])
- extends DStream[Array[T]](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Array[T]]] = {
- parent.getOrCompute(validTime).map(_.glom())
- }
-}
-
-
-/**
- * TODO
- */
-
-class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
- parent: DStream[(K,V)],
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiner: (C, C) => C,
- partitioner: Partitioner
- ) extends DStream [(K,C)] (parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K,C)]] = {
- parent.getOrCompute(validTime) match {
- case Some(rdd) =>
- Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
- parent: DStream[(K, V)],
- mapValueFunc: V => U
- ) extends DStream[(K, U)](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K, U)]] = {
- parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-class FlatMapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
- parent: DStream[(K, V)],
- flatMapValueFunc: V => TraversableOnce[U]
- ) extends DStream[(K, U)](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K, U)]] = {
- parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
- }
-}
-
-
-
-/**
- * TODO
- */
-
-class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
- extends DStream[T](parents.head.ssc) {
-
- if (parents.length == 0) {
- throw new IllegalArgumentException("Empty array of parents")
- }
-
- if (parents.map(_.ssc).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different StreamingContexts")
- }
-
- if (parents.map(_.slideTime).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different slide times")
- }
-
- override def dependencies = parents.toList
-
- override def slideTime: Time = parents.head.slideTime
-
- override def compute(validTime: Time): Option[RDD[T]] = {
- val rdds = new ArrayBuffer[RDD[T]]()
- parents.map(_.getOrCompute(validTime)).foreach(_ match {
- case Some(rdd) => rdds += rdd
- case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
- })
- if (rdds.size > 0) {
- Some(new UnionRDD(ssc.sc, rdds))
- } else {
- None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class PerElementForEachDStream[T: ClassManifest] (
- parent: DStream[T],
- foreachFunc: T => Unit
- ) extends DStream[Unit](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Unit]] = None
-
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => {
- val sparkJobFunc = {
- (iterator: Iterator[T]) => iterator.foreach(foreachFunc)
- }
- ssc.sc.runJob(rdd, sparkJobFunc)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class PerRDDForEachDStream[T: ClassManifest] (
- parent: DStream[T],
- foreachFunc: (RDD[T], Time) => Unit
- ) extends DStream[Unit](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Unit]] = None
-
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => {
- foreachFunc(rdd, time)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-class TransformedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- transformFunc: (RDD[T], Time) => RDD[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(transformFunc(_, validTime))
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index d0a9ade61d..c72429370e 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.InputDStream
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import collection.mutable.ArrayBuffer
import spark.Logging
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index ffb7725ac9..fa0b7ce19d 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -1,5 +1,6 @@
package spark.streaming
+private[streaming]
case class Interval(beginTime: Time, endTime: Time) {
def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs))
diff --git a/streaming/src/main/scala/spark/streaming/Job.scala b/streaming/src/main/scala/spark/streaming/Job.scala
index 0bcb6fd8dc..67bd8388bc 100644
--- a/streaming/src/main/scala/spark/streaming/Job.scala
+++ b/streaming/src/main/scala/spark/streaming/Job.scala
@@ -2,6 +2,7 @@ package spark.streaming
import java.util.concurrent.atomic.AtomicLong
+private[streaming]
class Job(val time: Time, func: () => _) {
val id = Job.getNewId()
def run(): Long = {
@@ -14,6 +15,7 @@ class Job(val time: Time, func: () => _) {
override def toString = "streaming job " + id + " @ " + time
}
+private[streaming]
object Job {
val id = new AtomicLong(0)
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 9bf9251519..3b910538e0 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -5,6 +5,7 @@ import spark.SparkEnv
import java.util.concurrent.Executors
+private[streaming]
class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
class JobHandler(ssc: StreamingContext, job: Job) extends Runnable {
@@ -13,7 +14,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
try {
val timeTaken = job.run()
logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
- (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
+ (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
logError("Running " + job + " failed", e)
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index b421f795ee..a6ab44271f 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -1,5 +1,7 @@
package spark.streaming
+import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
+import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import spark.Logging
import spark.SparkEnv
@@ -11,10 +13,10 @@ import akka.pattern.ask
import akka.util.duration._
import akka.dispatch._
-trait NetworkInputTrackerMessage
-case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
-case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
-case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
+private[streaming] sealed trait NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
class NetworkInputTracker(
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 720e63bba0..b0a208e67f 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -1,6 +1,9 @@
package spark.streaming
import spark.streaming.StreamingContext._
+import spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
+import spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
+import spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
@@ -218,13 +221,13 @@ extends Serializable {
def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
- new MapValuesDStream[K, V, U](self, mapValuesFunc)
+ new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
def flatMapValues[U: ClassManifest](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
- new FlatMapValuesDStream[K, V, U](self, flatMapValuesFunc)
+ new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
@@ -281,7 +284,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreachRDD(saveFunc)
+ self.foreach(saveFunc)
}
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
@@ -303,7 +306,7 @@ extends Serializable {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
}
- self.foreachRDD(saveFunc)
+ self.foreach(saveFunc)
}
private def getKeyClass() = implicitly[ClassManifest[K]].erasure
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 014021be61..eb40affe6d 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -4,14 +4,8 @@ import util.{ManualClock, RecurringTimer, Clock}
import spark.SparkEnv
import spark.Logging
-import scala.collection.mutable.HashMap
-
-
-sealed trait SchedulerMessage
-case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage
-
-class Scheduler(ssc: StreamingContext)
-extends Logging {
+private[streaming]
+class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
@@ -28,7 +22,7 @@ extends Logging {
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
- val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_))
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_))
def start() {
// If context was started from checkpoint, then restart timer such that
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ce47bcb2da..7256e41af9 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,10 +1,10 @@
package spark.streaming
-import spark.RDD
-import spark.Logging
-import spark.SparkEnv
-import spark.SparkContext
+import spark.streaming.dstream._
+
+import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.storage.StorageLevel
+import spark.util.MetadataCleaner
import scala.collection.mutable.Queue
@@ -15,10 +15,8 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path
import java.util.UUID
-import spark.util.MetadataCleaner
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -48,7 +46,7 @@ class StreamingContext private (
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
- * Recreates the StreamingContext from a checkpoint file.
+ * Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
@@ -61,7 +59,7 @@ class StreamingContext private (
"both SparkContext and checkpoint as null")
}
- val isCheckpointPresent = (cp_ != null)
+ protected[streaming] val isCheckpointPresent = (cp_ != null)
val sc: SparkContext = {
if (isCheckpointPresent) {
@@ -71,9 +69,9 @@ class StreamingContext private (
}
}
- val env = SparkEnv.get
+ protected[streaming] val env = SparkEnv.get
- val graph: DStreamGraph = {
+ protected[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
@@ -86,10 +84,10 @@ class StreamingContext private (
}
}
- private[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
- private[streaming] var networkInputTracker: NetworkInputTracker = null
+ protected[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
+ protected[streaming] var networkInputTracker: NetworkInputTracker = null
- private[streaming] var checkpointDir: String = {
+ protected[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
cp_.checkpointDir
@@ -98,18 +96,31 @@ class StreamingContext private (
}
}
- private[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
- private[streaming] var receiverJobThread: Thread = null
- private[streaming] var scheduler: Scheduler = null
+ protected[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
+ protected[streaming] var receiverJobThread: Thread = null
+ protected[streaming] var scheduler: Scheduler = null
+ /**
+ * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * collection. This method allows the developer to specify how to long to remember the RDDs (
+ * if the developer wishes to query old data outside the DStream computation).
+ * @param duration Minimum duration that each DStream should remember its RDDs
+ */
def remember(duration: Time) {
graph.remember(duration)
}
- def checkpoint(dir: String, interval: Time = null) {
- if (dir != null) {
- sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
- checkpointDir = dir
+ /**
+ * Sets the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
+ * @param interval checkpoint interval
+ */
+ def checkpoint(directory: String, interval: Time = null) {
+ if (directory != null) {
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
+ checkpointDir = directory
checkpointInterval = interval
} else {
checkpointDir = null
@@ -117,16 +128,15 @@ class StreamingContext private (
}
}
- private[streaming] def getInitialCheckpoint(): Checkpoint = {
+ protected[streaming] def getInitialCheckpoint(): Checkpoint = {
if (isCheckpointPresent) cp_ else null
}
- private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+ protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
- /**
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
- *
- * @param host Zookeper hostname.
+ * @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
@@ -148,6 +158,15 @@ class StreamingContext private (
inputStream
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
def networkTextStream(
hostname: String,
port: Int,
@@ -156,6 +175,16 @@ class StreamingContext private (
networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes it interepreted as object using the given
+ * converter.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param converter Function to convert the byte stream to objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects received (after converting bytes to objects)
+ */
def networkStream[T: ClassManifest](
hostname: String,
port: Int,
@@ -167,16 +196,32 @@ class StreamingContext private (
inputStream
}
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
def flumeStream (
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = {
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
graph.addInputStream(inputStream)
inputStream
}
-
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects in the received blocks
+ */
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
@@ -188,8 +233,12 @@ class StreamingContext private (
}
/**
- * This function creates a input stream that monitors a Hadoop-compatible filesystem
- * for new files and executes the necessary processing on them.
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * @param directory HDFS directory to monitor for new file
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassManifest,
@@ -201,13 +250,23 @@ class StreamingContext private (
inputStream
}
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as text files (using key as LongWritable, value
+ * as Text and input format as TextInputFormat).
+ * @param directory HDFS directory to monitor for new file
+ */
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
/**
- * This function create a input stream from an queue of RDDs. In each batch,
- * it will process either one or all of the RDDs returned by the queue
+ * Creates a input stream from an queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
+ * @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
queue: Queue[RDD[T]],
@@ -219,34 +278,29 @@ class StreamingContext private (
inputStream
}
- def queueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = {
- val queue = new Queue[RDD[T]]
- val inputStream = queueStream(queue, true, null)
- queue ++= array
- inputStream
- }
-
+ /**
+ * Create a unified DStream from multiple DStreams of the same type and same interval
+ */
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
/**
- * This function registers a InputDStream as an input stream that will be
- * started (InputDStream.start() called) to get the input data streams.
+ * Registers an input stream that will be started (InputDStream.start() called) to get the
+ * input data.
*/
def registerInputStream(inputStream: InputDStream[_]) {
graph.addInputStream(inputStream)
}
/**
- * This function registers a DStream as an output stream that will be
- * computed every interval.
+ * Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
}
- def validate() {
+ protected def validate() {
assert(graph != null, "Graph is null")
graph.validate()
@@ -258,7 +312,7 @@ class StreamingContext private (
}
/**
- * This function starts the execution of the streams.
+ * Starts the execution of the streams.
*/
def start() {
if (checkpointDir != null && checkpointInterval == null && graph != null) {
@@ -286,7 +340,7 @@ class StreamingContext private (
}
/**
- * This function stops the execution of the streams.
+ * Sstops the execution of the streams.
*/
def stop() {
try {
@@ -304,7 +358,11 @@ class StreamingContext private (
object StreamingContext {
- def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
+ implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
+ new PairDStreamFunctions[K, V](stream)
+ }
+
+ protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
@@ -314,13 +372,9 @@ object StreamingContext {
new SparkContext(master, frameworkName)
}
- implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
- new PairDStreamFunctions[K, V](stream)
- }
-
- def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
+ protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
if (prefix == null) {
- time.millis.toString
+ time.milliseconds.toString
} else if (suffix == null || suffix.length ==0) {
prefix + "-" + time.milliseconds
} else {
@@ -328,7 +382,7 @@ object StreamingContext {
}
}
- def getSparkCheckpointDir(sscCheckpointDir: String): String = {
+ protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 480d292d7c..3c6fd5d967 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -1,11 +1,18 @@
package spark.streaming
-case class Time(millis: Long) {
+/**
+ * This is a simple class that represents time. Internally, it represents time as UTC.
+ * The recommended way to create instances of Time is to use helper objects
+ * [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]].
+ * @param millis Time in UTC.
+ */
+
+case class Time(private val millis: Long) {
def < (that: Time): Boolean = (this.millis < that.millis)
-
+
def <= (that: Time): Boolean = (this.millis <= that.millis)
-
+
def > (that: Time): Boolean = (this.millis > that.millis)
def >= (that: Time): Boolean = (this.millis >= that.millis)
@@ -15,7 +22,9 @@ case class Time(millis: Long) {
def - (that: Time): Time = Time(millis - that.millis)
def * (times: Int): Time = Time(millis * times)
-
+
+ def / (that: Time): Long = millis / that.millis
+
def floor(that: Time): Time = {
val t = that.millis
val m = math.floor(this.millis / t).toLong
@@ -38,23 +47,33 @@ case class Time(millis: Long) {
def milliseconds: Long = millis
}
-object Time {
+private[streaming] object Time {
val zero = Time(0)
implicit def toTime(long: Long) = Time(long)
-
- implicit def toLong(time: Time) = time.milliseconds
}
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of milliseconds.
+ */
object Milliseconds {
def apply(milliseconds: Long) = Time(milliseconds)
}
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of seconds.
+ */
object Seconds {
def apply(seconds: Long) = Time(seconds * 1000)
-}
+}
-object Minutes {
+/**
+ * Helper object that creates instance of [[spark.streaming.Time]] representing
+ * a given number of minutes.
+ */
+object Minutes {
def apply(minutes: Long) = Time(minutes * 60000)
}
diff --git a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index 61d088eddb..bc23d423d3 100644
--- a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -1,8 +1,10 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.{RDD, Partitioner}
import spark.rdd.CoGroupedRDD
+import spark.streaming.{Time, DStream}
+private[streaming]
class CoGroupedDStream[K : ClassManifest](
parents: Seq[DStream[(_, _)]],
partitioner: Partitioner
diff --git a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala
index 80150708fd..41c3af4694 100644
--- a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala
@@ -1,6 +1,7 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
+import spark.streaming.{Time, StreamingContext}
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index 88856364d2..cf72095324 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -1,7 +1,8 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
+import spark.streaming.{StreamingContext, Time}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
@@ -9,7 +10,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import scala.collection.mutable.HashSet
-
+private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@transient ssc_ : StreamingContext,
directory: String,
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
new file mode 100644
index 0000000000..1cbb4d536e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class FilteredDStream[T: ClassManifest](
+ parent: DStream[T],
+ filterFunc: T => Boolean
+ ) extends DStream[T](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ parent.getOrCompute(validTime).map(_.filter(filterFunc))
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
new file mode 100644
index 0000000000..11ed8cf317
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import spark.SparkContext._
+
+private[streaming]
+class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ flatMapValueFunc: V => TraversableOnce[U]
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
new file mode 100644
index 0000000000..a13b4c9ff9
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ flatMapFunc: T => Traversable[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index 02d9811669..a6fa378d6e 100644
--- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -1,17 +1,23 @@
-package spark.streaming
+package spark.streaming.dstream
-import java.io.{ObjectInput, ObjectOutput, Externalizable}
+import spark.streaming.StreamingContext
+
+import spark.Utils
import spark.storage.StorageLevel
+
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
+
+import scala.collection.JavaConversions._
+
import java.net.InetSocketAddress
-import collection.JavaConversions._
-import spark.Utils
+import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
+private[streaming]
class FlumeInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -79,7 +85,7 @@ class SparkFlumeEvent() extends Externalizable {
}
}
-object SparkFlumeEvent {
+private[streaming] object SparkFlumeEvent {
def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
@@ -88,6 +94,7 @@ object SparkFlumeEvent {
}
/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)
@@ -103,12 +110,13 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
/** A NetworkReceiver which listens for events using the
* Flume Avro interface.*/
+private[streaming]
class FlumeReceiver(
- streamId: Int,
- host: String,
- port: Int,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
+ streamId: Int,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
lazy val dataHandler = new BufferingBlockCreator(this, storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
new file mode 100644
index 0000000000..41c629a225
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
@@ -0,0 +1,28 @@
+package spark.streaming.dstream
+
+import spark.RDD
+import spark.streaming.{DStream, Job, Time}
+
+private[streaming]
+class ForEachDStream[T: ClassManifest] (
+ parent: DStream[T],
+ foreachFunc: (RDD[T], Time) => Unit
+ ) extends DStream[Unit](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Unit]] = None
+
+ override def generateJob(time: Time): Option[Job] = {
+ parent.getOrCompute(time) match {
+ case Some(rdd) =>
+ val jobFunc = () => {
+ foreachFunc(rdd, time)
+ }
+ Some(new Job(time, jobFunc))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
new file mode 100644
index 0000000000..92ea503cae
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
@@ -0,0 +1,17 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class GlommedDStream[T: ClassManifest](parent: DStream[T])
+ extends DStream[Array[T]](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Array[T]]] = {
+ parent.getOrCompute(validTime).map(_.glom())
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
new file mode 100644
index 0000000000..4959c66b06
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -0,0 +1,19 @@
+package spark.streaming.dstream
+
+import spark.streaming.{StreamingContext, DStream}
+
+abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
+ extends DStream[T](ssc_) {
+
+ override def dependencies = List()
+
+ override def slideTime = {
+ if (ssc == null) throw new Exception("ssc is null")
+ if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
+ ssc.graph.batchDuration
+ }
+
+ def start()
+
+ def stop()
+}
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 66f60519bc..b1941fb427 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -1,30 +1,36 @@
-package spark.streaming
+package spark.streaming.dstream
+
+import spark.Logging
+import spark.storage.StorageLevel
+import spark.streaming.{Time, DStreamCheckpointData, StreamingContext}
import java.util.Properties
import java.util.concurrent.Executors
+
import kafka.consumer._
import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.StringDecoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
+
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-import spark._
-import spark.RDD
-import spark.storage.StorageLevel
+
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
// NOT USED - Originally intended for fault-tolerance
// Metadata for a Kafka Stream that it sent to the Master
+private[streaming]
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
// NOT USED - Originally intended for fault-tolerance
// Checkpoint data specific to a KafkaInputDstream
-case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
+private[streaming]
+case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
- * Input stream that pulls messages form a Kafka Broker.
+ * Input stream that pulls messages from a Kafka Broker.
*
* @param host Zookeper hostname.
* @param port Zookeper port.
@@ -35,6 +41,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
* By default the value is pulled from zookeper.
* @param storageLevel RDD storage level.
*/
+private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -94,6 +101,7 @@ class KafkaInputDStream[T: ClassManifest](
}
}
+private[streaming]
class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
new file mode 100644
index 0000000000..daf78c6893
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
new file mode 100644
index 0000000000..689caeef0e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import spark.SparkContext._
+
+private[streaming]
+class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ mapValueFunc: V => U
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
new file mode 100644
index 0000000000..786b9966f2
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class MappedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ mapFunc: T => U
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.map[U](mapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 4e4e9fc942..41276da8bb 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -1,12 +1,13 @@
-package spark.streaming
+package spark.streaming.dstream
-import scala.collection.mutable.ArrayBuffer
+import spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver}
import spark.{Logging, SparkEnv, RDD}
import spark.rdd.BlockRDD
-import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel
+import scala.collection.mutable.ArrayBuffer
+
import java.nio.ByteBuffer
import akka.actor.{Props, Actor}
@@ -40,10 +41,10 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
}
-sealed trait NetworkReceiverMessage
-case class StopReceiver(msg: String) extends NetworkReceiverMessage
-case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
-case class ReportError(msg: String) extends NetworkReceiverMessage
+private[streaming] sealed trait NetworkReceiverMessage
+private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
+private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging {
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
index bb86e51932..024bf3bea4 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
@@ -1,10 +1,11 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
+import spark.streaming.{Time, StreamingContext}
class QueueInputDStream[T: ClassManifest](
@transient ssc: StreamingContext,
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 6acaa9aab1..aa2f31cea8 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -1,12 +1,15 @@
-package spark.streaming
+package spark.streaming.dstream
+
+import spark.{DaemonThread, Logging}
+import spark.storage.StorageLevel
+import spark.streaming.StreamingContext
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
import java.io.EOFException
import java.util.concurrent.ArrayBlockingQueue
-import spark._
-import spark.storage.StorageLevel
+
/**
* An input stream that reads blocks of serialized objects from a given network address.
@@ -14,6 +17,7 @@ import spark.storage.StorageLevel
* data into Spark Streaming, though it requires the sender to batch data and serialize it
* in the format that the system is configured with.
*/
+private[streaming]
class RawInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -26,6 +30,7 @@ class RawInputDStream[T: ClassManifest](
}
}
+private[streaming]
class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel)
extends NetworkReceiver[Any](streamId) {
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index f63a9e0011..d289ed2a3f 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -1,17 +1,17 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.rdd.UnionRDD
import spark.rdd.CoGroupedRDD
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
-import collection.SeqProxy
+import spark.streaming.{Interval, Time, DStream}
+private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
new file mode 100644
index 0000000000..6854bbe665
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
@@ -0,0 +1,27 @@
+package spark.streaming.dstream
+
+import spark.{RDD, Partitioner}
+import spark.SparkContext._
+import spark.streaming.{DStream, Time}
+
+private[streaming]
+class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+ parent: DStream[(K,V)],
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ partitioner: Partitioner
+ ) extends DStream [(K,C)] (parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K,C)]] = {
+ parent.getOrCompute(validTime) match {
+ case Some(rdd) =>
+ Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index f7a34d2515..8374f131d6 100644
--- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -1,15 +1,12 @@
-package spark.streaming
+package spark.streaming.dstream
-import spark.streaming.util.{RecurringTimer, SystemClock}
+import spark.streaming.StreamingContext
import spark.storage.StorageLevel
import java.io._
import java.net.Socket
-import java.util.concurrent.ArrayBlockingQueue
-
-import scala.collection.mutable.ArrayBuffer
-import scala.Serializable
+private[streaming]
class SocketInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
@@ -23,7 +20,7 @@ class SocketInputDStream[T: ClassManifest](
}
}
-
+private[streaming]
class SocketReceiver[T: ClassManifest](
streamId: Int,
host: String,
@@ -54,7 +51,7 @@ class SocketReceiver[T: ClassManifest](
}
-
+private[streaming]
object SocketReceiver {
/**
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index b7e4c1c30c..175b3060c1 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -1,12 +1,12 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
-import spark.rdd.BlockRDD
import spark.Partitioner
-import spark.rdd.MapPartitionsRDD
import spark.SparkContext._
import spark.storage.StorageLevel
+import spark.streaming.{Time, DStream}
+private[streaming]
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
new file mode 100644
index 0000000000..0337579514
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
@@ -0,0 +1,19 @@
+package spark.streaming.dstream
+
+import spark.RDD
+import spark.streaming.{DStream, Time}
+
+private[streaming]
+class TransformedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ transformFunc: (RDD[T], Time) => RDD[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(transformFunc(_, validTime))
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
new file mode 100644
index 0000000000..3bf4c2ecea
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
@@ -0,0 +1,40 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import collection.mutable.ArrayBuffer
+import spark.rdd.UnionRDD
+
+private[streaming]
+class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
+ extends DStream[T](parents.head.ssc) {
+
+ if (parents.length == 0) {
+ throw new IllegalArgumentException("Empty array of parents")
+ }
+
+ if (parents.map(_.ssc).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different StreamingContexts")
+ }
+
+ if (parents.map(_.slideTime).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different slide times")
+ }
+
+ override def dependencies = parents.toList
+
+ override def slideTime: Time = parents.head.slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ val rdds = new ArrayBuffer[RDD[T]]()
+ parents.map(_.getOrCompute(validTime)).foreach(_ match {
+ case Some(rdd) => rdds += rdd
+ case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+ })
+ if (rdds.size > 0) {
+ Some(new UnionRDD(ssc.sc, rdds))
+ } else {
+ None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
index e4d2a634f5..7718794cbf 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
@@ -1,10 +1,11 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import spark.storage.StorageLevel
+import spark.streaming.{Interval, Time, DStream}
-
+private[streaming]
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
_windowTime: Time,
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index 6cb2b4c042..dfaaf03f03 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -25,8 +25,8 @@ object GrepRaw {
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
- val union = new UnionDStream(rawStreams)
- union.filter(_.contains("Alice")).count().foreachRDD(r =>
+ val union = ssc.union(rawStreams)
+ union.filter(_.contains("Alice")).count().foreach(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index fe4c2bf155..338834bc3c 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -34,11 +34,11 @@ object TopKWordCountRaw {
val lines = (1 to numStreams).map(_ => {
ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
})
- val union = new UnionDStream(lines.toArray)
+ val union = ssc.union(lines)
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
- partialTopKWindowedCounts.foreachRDD(rdd => {
+ partialTopKWindowedCounts.foreach(rdd => {
val collectedCounts = rdd.collect
println("Collected " + collectedCounts.size + " words from partial top words")
println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index a29c81d437..d93335a8ce 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -33,10 +33,10 @@ object WordCountRaw {
val lines = (1 to numStreams).map(_ => {
ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
})
- val union = new UnionDStream(lines.toArray)
+ val union = ssc.union(lines)
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- windowedCounts.foreachRDD(r => println("# unique words = " + r.count()))
+ windowedCounts.foreach(r => println("# unique words = " + r.count()))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index 68be6b7893..a191321d91 100644
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -72,7 +72,7 @@ object PageViewStream {
case "popularUsersSeen" =>
// Look for users in our existing dataset and print it out if we have a match
pageViews.map(view => (view.userID, 1))
- .foreachRDD((rdd, time) => rdd.join(userList)
+ .foreach((rdd, time) => rdd.join(userList)
.map(_._2._2)
.take(10)
.foreach(u => println("Saw user %s at time %s".format(u, time))))
diff --git a/streaming/src/main/scala/spark/streaming/util/Clock.scala b/streaming/src/main/scala/spark/streaming/util/Clock.scala
index ed087e4ea8..974651f9f6 100644
--- a/streaming/src/main/scala/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/spark/streaming/util/Clock.scala
@@ -1,13 +1,12 @@
package spark.streaming.util
-import spark.streaming._
-
-trait Clock {
+private[streaming]
+trait Clock {
def currentTime(): Long
def waitTillTime(targetTime: Long): Long
}
-
+private[streaming]
class SystemClock() extends Clock {
val minPollTime = 25L
@@ -54,6 +53,7 @@ class SystemClock() extends Clock {
}
}
+private[streaming]
class ManualClock() extends Clock {
var time = 0L
diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
index dc55fd902b..db715cc295 100644
--- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
@@ -1,5 +1,6 @@
package spark.streaming.util
+private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
val minPollTime = 25L
@@ -53,6 +54,7 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
}
+private[streaming]
object RecurringTimer {
def main(args: Array[String]) {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 0d82b2f1ea..920388bba9 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -42,7 +42,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
- val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2
+ val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
val secondNumBatches = firstNumBatches
// Setup the streams
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index 5b414117fc..4aa428bf64 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -133,7 +133,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
// Get the output buffer
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
val output = outputStream.output
- val waitTime = (batchDuration.millis * (numBatches.toDouble + 0.5)).toLong
+ val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
val startTime = System.currentTimeMillis()
try {
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index ed9a659092..76b528bec3 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.SparkFlumeEvent
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 8cc2f8ccfc..28bdd53c3c 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -1,12 +1,16 @@
package spark.streaming
+import spark.streaming.dstream.{InputDStream, ForEachDStream}
+import spark.streaming.util.ManualClock
+
import spark.{RDD, Logging}
-import util.ManualClock
+
import collection.mutable.ArrayBuffer
-import org.scalatest.FunSuite
import collection.mutable.SynchronizedBuffer
+
import java.io.{ObjectInputStream, IOException}
+import org.scalatest.FunSuite
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -35,7 +39,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
- extends PerRDDForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+ extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
}) {
@@ -70,6 +74,10 @@ trait TestSuiteBase extends FunSuite with Logging {
def actuallyWait = false
+ /**
+ * Set up required DStreams to test the DStream operation using the two sequences
+ * of input collections.
+ */
def setupStreams[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V]
@@ -90,6 +98,10 @@ trait TestSuiteBase extends FunSuite with Logging {
ssc
}
+ /**
+ * Set up required DStreams to test the binary operation using the sequence
+ * of input collections.
+ */
def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@@ -173,6 +185,11 @@ trait TestSuiteBase extends FunSuite with Logging {
output
}
+ /**
+ * Verify whether the output values after running a DStream operation
+ * is same as the expected output values, by comparing the output
+ * collections either as lists (order matters) or sets (order does not matter)
+ */
def verifyOutput[V: ClassManifest](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
@@ -199,6 +216,10 @@ trait TestSuiteBase extends FunSuite with Logging {
logInfo("Output verified successfully")
}
+ /**
+ * Test unary DStream operation with a list of inputs, with number of
+ * batches to run same as the number of expected output values
+ */
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -208,6 +229,15 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V](input, operation, expectedOutput, -1, useSet)
}
+ /**
+ * Test unary DStream operation with a list of inputs
+ * @param input Sequence of input collections
+ * @param operation Binary DStream operation to be applied to the 2 inputs
+ * @param expectedOutput Sequence of expected output collections
+ * @param numBatches Number of batches to run the operation for
+ * @param useSet Compare the output values with the expected output values
+ * as sets (order matters) or as lists (order does not matter)
+ */
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -221,6 +251,10 @@ trait TestSuiteBase extends FunSuite with Logging {
verifyOutput[V](output, expectedOutput, useSet)
}
+ /**
+ * Test binary DStream operation with two lists of inputs, with number of
+ * batches to run same as the number of expected output values
+ */
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@@ -231,6 +265,16 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet)
}
+ /**
+ * Test binary DStream operation with two lists of inputs
+ * @param input1 First sequence of input collections
+ * @param input2 Second sequence of input collections
+ * @param operation Binary DStream operation to be applied to the 2 inputs
+ * @param expectedOutput Sequence of expected output collections
+ * @param numBatches Number of batches to run the operation for
+ * @param useSet Compare the output values with the expected output values
+ * as sets (order matters) or as lists (order does not matter)
+ */
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 3e20e16708..4bc5229465 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -209,7 +209,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.groupByKeyAndWindow(windowTime, slideTime)
.map(x => (x._1, x._2.toSet))
@@ -223,7 +223,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -233,7 +233,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt))
}
@@ -251,7 +251,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("window - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.window(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -265,7 +265,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist()
}
@@ -281,7 +281,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindowInv - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime)
.persist()