aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-30 15:19:55 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-30 15:19:55 -0800
commit7e0271b4387eaf27cd96f3057ce2465b1271a480 (patch)
treeb02f4f29e1159729240078a6b90b3700ce540db9 /streaming
parent9e644402c155b5fc68794a17c36ddd19d3242f4f (diff)
downloadspark-7e0271b4387eaf27cd96f3057ce2465b1271a480.tar.gz
spark-7e0271b4387eaf27cd96f3057ce2465b1271a480.tar.bz2
spark-7e0271b4387eaf27cd96f3057ce2465b1271a480.zip
Refactored a whole lot to push all DStreams into the spark.streaming.dstream package.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala276
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala13
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala (renamed from streaming/src/main/scala/spark/streaming/DataHandler.scala)2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/FileInputDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala)13
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala17
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala)14
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala21
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala20
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala)7
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/QueueInputDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/RawInputDStream.scala)9
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala)5
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/SocketInputDStream.scala)8
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala (renamed from streaming/src/main/scala/spark/streaming/StateDStream.scala)5
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala39
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala (renamed from streaming/src/main/scala/spark/streaming/WindowedDStream.scala)3
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala1
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala48
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala12
40 files changed, 401 insertions, 333 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 3834b57ed3..292ad3b9f9 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -1,17 +1,15 @@
package spark.streaming
+import spark.streaming.dstream._
import StreamingContext._
import Time._
-import spark._
-import spark.SparkContext._
-import spark.rdd._
+import spark.{RDD, Logging}
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import java.util.concurrent.ArrayBlockingQueue
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
@@ -197,7 +195,7 @@ abstract class DStream[T: ClassManifest] (
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
"delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
"the Java property 'spark.cleaner.delay' to more than " +
- math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
+ math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes."
)
dependencies.foreach(_.validate())
@@ -642,271 +640,3 @@ abstract class DStream[T: ClassManifest] (
private[streaming]
case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
- extends DStream[T](ssc_) {
-
- override def dependencies = List()
-
- override def slideTime = {
- if (ssc == null) throw new Exception("ssc is null")
- if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
- ssc.graph.batchDuration
- }
-
- def start()
-
- def stop()
-}
-
-
-/**
- * TODO
- */
-
-private[streaming]
-class MappedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- mapFunc: T => U
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.map[U](mapFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-private[streaming]
-class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- flatMapFunc: T => Traversable[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-private[streaming]
-class FilteredDStream[T: ClassManifest](
- parent: DStream[T],
- filterFunc: T => Boolean
- ) extends DStream[T](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[T]] = {
- parent.getOrCompute(validTime).map(_.filter(filterFunc))
- }
-}
-
-
-/**
- * TODO
- */
-
-private[streaming]
-class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
- parent: DStream[T],
- mapPartFunc: Iterator[T] => Iterator[U],
- preservePartitioning: Boolean
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
- }
-}
-
-
-/**
- * TODO
- */
-
-private[streaming]
-class GlommedDStream[T: ClassManifest](parent: DStream[T])
- extends DStream[Array[T]](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Array[T]]] = {
- parent.getOrCompute(validTime).map(_.glom())
- }
-}
-
-
-/**
- * TODO
- */
-
-private[streaming]
-class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
- parent: DStream[(K,V)],
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiner: (C, C) => C,
- partitioner: Partitioner
- ) extends DStream [(K,C)] (parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K,C)]] = {
- parent.getOrCompute(validTime) match {
- case Some(rdd) =>
- Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-private[streaming]
-class MapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
- parent: DStream[(K, V)],
- mapValueFunc: V => U
- ) extends DStream[(K, U)](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K, U)]] = {
- parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
- }
-}
-
-
-/**
- * TODO
- */
-private[streaming]
-class FlatMapValuesDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
- parent: DStream[(K, V)],
- flatMapValueFunc: V => TraversableOnce[U]
- ) extends DStream[(K, U)](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[(K, U)]] = {
- parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
- }
-}
-
-
-
-/**
- * TODO
- */
-
-class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
- extends DStream[T](parents.head.ssc) {
-
- if (parents.length == 0) {
- throw new IllegalArgumentException("Empty array of parents")
- }
-
- if (parents.map(_.ssc).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different StreamingContexts")
- }
-
- if (parents.map(_.slideTime).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different slide times")
- }
-
- override def dependencies = parents.toList
-
- override def slideTime: Time = parents.head.slideTime
-
- override def compute(validTime: Time): Option[RDD[T]] = {
- val rdds = new ArrayBuffer[RDD[T]]()
- parents.map(_.getOrCompute(validTime)).foreach(_ match {
- case Some(rdd) => rdds += rdd
- case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
- })
- if (rdds.size > 0) {
- Some(new UnionRDD(ssc.sc, rdds))
- } else {
- None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-private[streaming]
-class ForEachDStream[T: ClassManifest] (
- parent: DStream[T],
- foreachFunc: (RDD[T], Time) => Unit
- ) extends DStream[Unit](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[Unit]] = None
-
- override def generateJob(time: Time): Option[Job] = {
- parent.getOrCompute(time) match {
- case Some(rdd) =>
- val jobFunc = () => {
- foreachFunc(rdd, time)
- }
- Some(new Job(time, jobFunc))
- case None => None
- }
- }
-}
-
-
-/**
- * TODO
- */
-
-private[streaming]
-class TransformedDStream[T: ClassManifest, U: ClassManifest] (
- parent: DStream[T],
- transformFunc: (RDD[T], Time) => RDD[U]
- ) extends DStream[U](parent.ssc) {
-
- override def dependencies = List(parent)
-
- override def slideTime: Time = parent.slideTime
-
- override def compute(validTime: Time): Option[RDD[U]] = {
- parent.getOrCompute(validTime).map(transformFunc(_, validTime))
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index d0a9ade61d..c72429370e 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.InputDStream
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import collection.mutable.ArrayBuffer
import spark.Logging
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index 658498dfc1..a6ab44271f 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -1,5 +1,7 @@
package spark.streaming
+import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
+import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import spark.Logging
import spark.SparkEnv
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index f9fef14196..b0a208e67f 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -1,6 +1,9 @@
package spark.streaming
import spark.streaming.StreamingContext._
+import spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
+import spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
+import spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
@@ -218,13 +221,13 @@ extends Serializable {
def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
- new MapValuesDStream[K, V, U](self, mapValuesFunc)
+ new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
def flatMapValues[U: ClassManifest](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
- new FlatMapValuesDStream[K, V, U](self, flatMapValuesFunc)
+ new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index fd1fa77a24..aeb7c3eb0e 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -4,9 +4,6 @@ import util.{ManualClock, RecurringTimer, Clock}
import spark.SparkEnv
import spark.Logging
-import scala.collection.mutable.HashMap
-
-
private[streaming]
class Scheduler(ssc: StreamingContext) extends Logging {
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 998fea849f..ef73049a81 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,10 +1,10 @@
package spark.streaming
-import spark.RDD
-import spark.Logging
-import spark.SparkEnv
-import spark.SparkContext
+import spark.streaming.dstream._
+
+import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.storage.StorageLevel
+import spark.util.MetadataCleaner
import scala.collection.mutable.Queue
@@ -18,7 +18,6 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path
import java.util.UUID
-import spark.util.MetadataCleaner
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -126,7 +125,7 @@ class StreamingContext private (
/**
* Create an input stream that pulls messages form a Kafka Broker.
*
- * @param host Zookeper hostname.
+ * @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
@@ -319,7 +318,7 @@ object StreamingContext {
protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
if (prefix == null) {
- time.millis.toString
+ time.milliseconds.toString
} else if (suffix == null || suffix.length ==0) {
prefix + "-" + time.milliseconds
} else {
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 480d292d7c..2976e5e87b 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -1,6 +1,11 @@
package spark.streaming
-case class Time(millis: Long) {
+/**
+ * This class is simple wrapper class that represents time in UTC.
+ * @param millis Time in UTC long
+ */
+
+case class Time(private val millis: Long) {
def < (that: Time): Boolean = (this.millis < that.millis)
@@ -15,7 +20,9 @@ case class Time(millis: Long) {
def - (that: Time): Time = Time(millis - that.millis)
def * (times: Int): Time = Time(millis * times)
-
+
+ def / (that: Time): Long = millis / that.millis
+
def floor(that: Time): Time = {
val t = that.millis
val m = math.floor(this.millis / t).toLong
diff --git a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index 61d088eddb..2e427dadf7 100644
--- a/streaming/src/main/scala/spark/streaming/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -1,7 +1,8 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.{RDD, Partitioner}
import spark.rdd.CoGroupedRDD
+import spark.streaming.{Time, DStream}
class CoGroupedDStream[K : ClassManifest](
parents: Seq[DStream[(_, _)]],
diff --git a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala
index 80150708fd..41c3af4694 100644
--- a/streaming/src/main/scala/spark/streaming/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ConstantInputDStream.scala
@@ -1,6 +1,7 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
+import spark.streaming.{Time, StreamingContext}
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
diff --git a/streaming/src/main/scala/spark/streaming/DataHandler.scala b/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala
index 05f307a8d1..d737ba1ecc 100644
--- a/streaming/src/main/scala/spark/streaming/DataHandler.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala
@@ -1,4 +1,4 @@
-package spark.streaming
+package spark.streaming.dstream
import java.util.concurrent.ArrayBlockingQueue
import scala.collection.mutable.ArrayBuffer
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index 88856364d2..8cdaff467b 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -1,7 +1,8 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
+import spark.streaming.{StreamingContext, Time}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
new file mode 100644
index 0000000000..1cbb4d536e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class FilteredDStream[T: ClassManifest](
+ parent: DStream[T],
+ filterFunc: T => Boolean
+ ) extends DStream[T](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ parent.getOrCompute(validTime).map(_.filter(filterFunc))
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
new file mode 100644
index 0000000000..11ed8cf317
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import spark.SparkContext._
+
+private[streaming]
+class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ flatMapValueFunc: V => TraversableOnce[U]
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
new file mode 100644
index 0000000000..a13b4c9ff9
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ flatMapFunc: T => Traversable[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index 5ac7e5b08e..7e988cadf4 100644
--- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -1,15 +1,20 @@
-package spark.streaming
+package spark.streaming.dstream
-import java.io.{ObjectInput, ObjectOutput, Externalizable}
+import spark.streaming.StreamingContext
+
+import spark.Utils
import spark.storage.StorageLevel
+
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
+
+import scala.collection.JavaConversions._
+
import java.net.InetSocketAddress
-import collection.JavaConversions._
-import spark.Utils
+import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
class FlumeInputDStream[T: ClassManifest](
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
new file mode 100644
index 0000000000..41c629a225
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala
@@ -0,0 +1,28 @@
+package spark.streaming.dstream
+
+import spark.RDD
+import spark.streaming.{DStream, Job, Time}
+
+private[streaming]
+class ForEachDStream[T: ClassManifest] (
+ parent: DStream[T],
+ foreachFunc: (RDD[T], Time) => Unit
+ ) extends DStream[Unit](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Unit]] = None
+
+ override def generateJob(time: Time): Option[Job] = {
+ parent.getOrCompute(time) match {
+ case Some(rdd) =>
+ val jobFunc = () => {
+ foreachFunc(rdd, time)
+ }
+ Some(new Job(time, jobFunc))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
new file mode 100644
index 0000000000..92ea503cae
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala
@@ -0,0 +1,17 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class GlommedDStream[T: ClassManifest](parent: DStream[T])
+ extends DStream[Array[T]](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Array[T]]] = {
+ parent.getOrCompute(validTime).map(_.glom())
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
new file mode 100644
index 0000000000..4959c66b06
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -0,0 +1,19 @@
+package spark.streaming.dstream
+
+import spark.streaming.{StreamingContext, DStream}
+
+abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
+ extends DStream[T](ssc_) {
+
+ override def dependencies = List()
+
+ override def slideTime = {
+ if (ssc == null) throw new Exception("ssc is null")
+ if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
+ ssc.graph.batchDuration
+ }
+
+ def start()
+
+ def stop()
+}
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 7c642d4802..a46721af2f 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -1,17 +1,21 @@
-package spark.streaming
+package spark.streaming.dstream
+
+import spark.Logging
+import spark.storage.StorageLevel
+import spark.streaming.{Time, DStreamCheckpointData, StreamingContext}
import java.util.Properties
import java.util.concurrent.Executors
+
import kafka.consumer._
import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.StringDecoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
+
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-import spark._
-import spark.RDD
-import spark.storage.StorageLevel
+
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
@@ -24,7 +28,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
- * Input stream that pulls messages form a Kafka Broker.
+ * Input stream that pulls messages from a Kafka Broker.
*
* @param host Zookeper hostname.
* @param port Zookeper port.
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
new file mode 100644
index 0000000000..daf78c6893
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
+ parent: DStream[T],
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
new file mode 100644
index 0000000000..689caeef0e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala
@@ -0,0 +1,21 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import spark.SparkContext._
+
+private[streaming]
+class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+ parent: DStream[(K, V)],
+ mapValueFunc: V => U
+ ) extends DStream[(K, U)](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K, U)]] = {
+ parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
new file mode 100644
index 0000000000..786b9966f2
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala
@@ -0,0 +1,20 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+
+private[streaming]
+class MappedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ mapFunc: T => U
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.map[U](mapFunc))
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 4bf13dd50c..41276da8bb 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -1,12 +1,13 @@
-package spark.streaming
+package spark.streaming.dstream
-import scala.collection.mutable.ArrayBuffer
+import spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver}
import spark.{Logging, SparkEnv, RDD}
import spark.rdd.BlockRDD
-import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel
+import scala.collection.mutable.ArrayBuffer
+
import java.nio.ByteBuffer
import akka.actor.{Props, Actor}
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
index bb86e51932..024bf3bea4 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
@@ -1,10 +1,11 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
+import spark.streaming.{Time, StreamingContext}
class QueueInputDStream[T: ClassManifest](
@transient ssc: StreamingContext,
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 6acaa9aab1..996cc7dea8 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -1,12 +1,15 @@
-package spark.streaming
+package spark.streaming.dstream
+
+import spark.{DaemonThread, Logging}
+import spark.storage.StorageLevel
+import spark.streaming.StreamingContext
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
import java.io.EOFException
import java.util.concurrent.ArrayBlockingQueue
-import spark._
-import spark.storage.StorageLevel
+
/**
* An input stream that reads blocks of serialized objects from a given network address.
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index f63a9e0011..2686de14d2 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -1,16 +1,15 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.rdd.UnionRDD
import spark.rdd.CoGroupedRDD
import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
-import collection.SeqProxy
+import spark.streaming.{Interval, Time, DStream}
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
new file mode 100644
index 0000000000..6854bbe665
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala
@@ -0,0 +1,27 @@
+package spark.streaming.dstream
+
+import spark.{RDD, Partitioner}
+import spark.SparkContext._
+import spark.streaming.{DStream, Time}
+
+private[streaming]
+class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+ parent: DStream[(K,V)],
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ partitioner: Partitioner
+ ) extends DStream [(K,C)] (parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K,C)]] = {
+ parent.getOrCompute(validTime) match {
+ case Some(rdd) =>
+ Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index a9e37c0ff0..af5b73ae8d 100644
--- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -1,14 +1,10 @@
-package spark.streaming
+package spark.streaming.dstream
-import spark.streaming.util.{RecurringTimer, SystemClock}
+import spark.streaming.StreamingContext
import spark.storage.StorageLevel
import java.io._
import java.net.Socket
-import java.util.concurrent.ArrayBlockingQueue
-
-import scala.collection.mutable.ArrayBuffer
-import scala.Serializable
class SocketInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index b7e4c1c30c..6e190b5564 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -1,11 +1,10 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
-import spark.rdd.BlockRDD
import spark.Partitioner
-import spark.rdd.MapPartitionsRDD
import spark.SparkContext._
import spark.storage.StorageLevel
+import spark.streaming.{Time, DStream}
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
new file mode 100644
index 0000000000..0337579514
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala
@@ -0,0 +1,19 @@
+package spark.streaming.dstream
+
+import spark.RDD
+import spark.streaming.{DStream, Time}
+
+private[streaming]
+class TransformedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ transformFunc: (RDD[T], Time) => RDD[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(transformFunc(_, validTime))
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
new file mode 100644
index 0000000000..f1efb2ae72
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala
@@ -0,0 +1,39 @@
+package spark.streaming.dstream
+
+import spark.streaming.{DStream, Time}
+import spark.RDD
+import collection.mutable.ArrayBuffer
+import spark.rdd.UnionRDD
+
+class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
+ extends DStream[T](parents.head.ssc) {
+
+ if (parents.length == 0) {
+ throw new IllegalArgumentException("Empty array of parents")
+ }
+
+ if (parents.map(_.ssc).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different StreamingContexts")
+ }
+
+ if (parents.map(_.slideTime).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different slide times")
+ }
+
+ override def dependencies = parents.toList
+
+ override def slideTime: Time = parents.head.slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ val rdds = new ArrayBuffer[RDD[T]]()
+ parents.map(_.getOrCompute(validTime)).foreach(_ match {
+ case Some(rdd) => rdds += rdd
+ case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+ })
+ if (rdds.size > 0) {
+ Some(new UnionRDD(ssc.sc, rdds))
+ } else {
+ None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
index e4d2a634f5..4b2621c497 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala
@@ -1,8 +1,9 @@
-package spark.streaming
+package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
import spark.storage.StorageLevel
+import spark.streaming.{Interval, Time, DStream}
class WindowedDStream[T: ClassManifest](
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index 7c4ee3b34c..dfaaf03f03 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -25,7 +25,7 @@ object GrepRaw {
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
- val union = new UnionDStream(rawStreams)
+ val union = ssc.union(rawStreams)
union.filter(_.contains("Alice")).count().foreach(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index 182dfd8a52..338834bc3c 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -34,7 +34,7 @@ object TopKWordCountRaw {
val lines = (1 to numStreams).map(_ => {
ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
})
- val union = new UnionDStream(lines.toArray)
+ val union = ssc.union(lines)
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index 9bcd30f4d7..d93335a8ce 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -33,7 +33,7 @@ object WordCountRaw {
val lines = (1 to numStreams).map(_ => {
ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
})
- val union = new UnionDStream(lines.toArray)
+ val union = ssc.union(lines)
val counts = union.mapPartitions(splitAndCountPartitions)
val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
windowedCounts.foreach(r => println("# unique words = " + r.count()))
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 0d82b2f1ea..920388bba9 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -42,7 +42,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
- val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2
+ val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
val secondNumBatches = firstNumBatches
// Setup the streams
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index 5b414117fc..4aa428bf64 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -133,7 +133,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
// Get the output buffer
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
val output = outputStream.output
- val waitTime = (batchDuration.millis * (numBatches.toDouble + 0.5)).toLong
+ val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
val startTime = System.currentTimeMillis()
try {
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index ed9a659092..76b528bec3 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.SparkFlumeEvent
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index a44f738957..28bdd53c3c 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -1,12 +1,16 @@
package spark.streaming
+import spark.streaming.dstream.{InputDStream, ForEachDStream}
+import spark.streaming.util.ManualClock
+
import spark.{RDD, Logging}
-import util.ManualClock
+
import collection.mutable.ArrayBuffer
-import org.scalatest.FunSuite
import collection.mutable.SynchronizedBuffer
+
import java.io.{ObjectInputStream, IOException}
+import org.scalatest.FunSuite
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -70,6 +74,10 @@ trait TestSuiteBase extends FunSuite with Logging {
def actuallyWait = false
+ /**
+ * Set up required DStreams to test the DStream operation using the two sequences
+ * of input collections.
+ */
def setupStreams[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V]
@@ -90,6 +98,10 @@ trait TestSuiteBase extends FunSuite with Logging {
ssc
}
+ /**
+ * Set up required DStreams to test the binary operation using the sequence
+ * of input collections.
+ */
def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@@ -173,6 +185,11 @@ trait TestSuiteBase extends FunSuite with Logging {
output
}
+ /**
+ * Verify whether the output values after running a DStream operation
+ * is same as the expected output values, by comparing the output
+ * collections either as lists (order matters) or sets (order does not matter)
+ */
def verifyOutput[V: ClassManifest](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
@@ -199,6 +216,10 @@ trait TestSuiteBase extends FunSuite with Logging {
logInfo("Output verified successfully")
}
+ /**
+ * Test unary DStream operation with a list of inputs, with number of
+ * batches to run same as the number of expected output values
+ */
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -208,6 +229,15 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V](input, operation, expectedOutput, -1, useSet)
}
+ /**
+ * Test unary DStream operation with a list of inputs
+ * @param input Sequence of input collections
+ * @param operation Binary DStream operation to be applied to the 2 inputs
+ * @param expectedOutput Sequence of expected output collections
+ * @param numBatches Number of batches to run the operation for
+ * @param useSet Compare the output values with the expected output values
+ * as sets (order matters) or as lists (order does not matter)
+ */
def testOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
@@ -221,6 +251,10 @@ trait TestSuiteBase extends FunSuite with Logging {
verifyOutput[V](output, expectedOutput, useSet)
}
+ /**
+ * Test binary DStream operation with two lists of inputs, with number of
+ * batches to run same as the number of expected output values
+ */
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
@@ -231,6 +265,16 @@ trait TestSuiteBase extends FunSuite with Logging {
testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet)
}
+ /**
+ * Test binary DStream operation with two lists of inputs
+ * @param input1 First sequence of input collections
+ * @param input2 Second sequence of input collections
+ * @param operation Binary DStream operation to be applied to the 2 inputs
+ * @param expectedOutput Sequence of expected output collections
+ * @param numBatches Number of batches to run the operation for
+ * @param useSet Compare the output values with the expected output values
+ * as sets (order matters) or as lists (order does not matter)
+ */
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 3e20e16708..4bc5229465 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -209,7 +209,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.groupByKeyAndWindow(windowTime, slideTime)
.map(x => (x._1, x._2.toSet))
@@ -223,7 +223,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -233,7 +233,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
val windowTime = Seconds(2)
val slideTime = Seconds(1)
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt))
}
@@ -251,7 +251,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("window - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[Int]) => s.window(windowTime, slideTime)
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -265,7 +265,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist()
}
@@ -281,7 +281,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideTime: Time = Seconds(1)
) {
test("reduceByKeyAndWindowInv - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime)
.persist()