aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project/SparkBuild.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala62
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala1
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuite.scala2
4 files changed, 49 insertions, 20 deletions
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6a60f10be4..358213fe64 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -84,7 +84,9 @@ object SparkBuild extends Build {
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
- def streamingSettings = sharedSettings ++ Seq(name := "spark-streaming")
+ def streamingSettings = sharedSettings ++ Seq(
+ name := "spark-streaming"
+ ) ++ assemblySettings ++ extraAssemblySettings
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
mergeStrategy in assembly := {
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 8c06345933..08eda056c9 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -12,7 +12,7 @@ import spark.Partitioner
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.ArrayBlockingQueue
abstract class DStream[T: ClassManifest] (@transient val ssc: StreamingContext)
extends Logging with Serializable {
@@ -166,15 +166,17 @@ extends Logging with Serializable {
def map[U: ClassManifest](mapFunc: T => U) = new MappedDStream(this, ssc.sc.clean(mapFunc))
- def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]) =
+ def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]) = {
new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
+ }
def filter(filterFunc: T => Boolean) = new FilteredDStream(this, filterFunc)
def glom() = new GlommedDStream(this)
- def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) =
+ def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) = {
new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc))
+ }
def reduce(reduceFunc: (T, T) => T) = this.map(x => (1, x)).reduceByKey(reduceFunc, 1).map(_._2)
@@ -182,18 +184,30 @@ extends Logging with Serializable {
def collect() = this.map(x => (1, x)).groupByKey(1).map(_._2)
- def foreach(foreachFunc: T => Unit) = {
+ def foreach(foreachFunc: T => Unit) {
val newStream = new PerElementForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
newStream
}
- def foreachRDD(foreachFunc: RDD[T] => Unit) = {
+ def foreachRDD(foreachFunc: RDD[T] => Unit) {
+ foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
+ }
+
+ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc))
ssc.registerOutputStream(newStream)
newStream
}
+ def transformRDD[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ transformRDD((r: RDD[T], t: Time) => transformFunc(r))
+ }
+
+ def transformRDD[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ new TransformedDStream(this, ssc.sc.clean(transformFunc))
+ }
+
private[streaming] def toQueue = {
val queue = new ArrayBlockingQueue[RDD[T]](10000)
this.foreachRDD(rdd => {
@@ -361,15 +375,13 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
- partitioner: Partitioner)
- extends DStream [(K,C)] (parent.ssc) {
+ partitioner: Partitioner
+ ) extends DStream [(K,C)] (parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
-
-
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) =>
@@ -385,7 +397,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
*/
class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]])
-extends DStream[T](parents(0).ssc) {
+ extends DStream[T](parents(0).ssc) {
if (parents.length == 0) {
throw new IllegalArgumentException("Empty array of parents")
@@ -424,8 +436,8 @@ extends DStream[T](parents(0).ssc) {
class PerElementForEachDStream[T: ClassManifest] (
parent: DStream[T],
- foreachFunc: T => Unit)
-extends DStream[Unit](parent.ssc) {
+ foreachFunc: T => Unit
+ ) extends DStream[Unit](parent.ssc) {
override def dependencies = List(parent)
@@ -455,11 +467,8 @@ extends DStream[Unit](parent.ssc) {
class PerRDDForEachDStream[T: ClassManifest] (
parent: DStream[T],
- foreachFunc: (RDD[T], Time) => Unit)
-extends DStream[Unit](parent.ssc) {
-
- def this(parent: DStream[T], altForeachFunc: (RDD[T]) => Unit) =
- this(parent, (rdd: RDD[T], time: Time) => altForeachFunc(rdd))
+ foreachFunc: (RDD[T], Time) => Unit
+ ) extends DStream[Unit](parent.ssc) {
override def dependencies = List(parent)
@@ -478,3 +487,22 @@ extends DStream[Unit](parent.ssc) {
}
}
}
+
+
+/**
+ * TODO
+ */
+
+class TransformedDStream[T: ClassManifest, U: ClassManifest] (
+ parent: DStream[T],
+ transformFunc: (RDD[T], Time) => RDD[U]
+ ) extends DStream[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(transformFunc(_, validTime))
+ }
+ }
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index eabb33d89e..f313d8c162 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -4,7 +4,6 @@ import spark.RDD
import spark.Partitioner
import spark.MapPartitionsRDD
import spark.SparkContext._
-import javax.annotation.Nullable
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
index d5eb20b37e..030f351080 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
@@ -105,7 +105,7 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
Seq(("a", 1), ("b", 1), ("c", 1)),
Seq(("a", 2), ("b", 2), ("c", 2)),
Seq(("a", 3), ("b", 3), ("c", 3))
- )//.map(array => array.toSeq.map(x => (x._1, new RichInt(x._2))))
+ )
val updateStateOp =(s: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: RichInt) => {