diff options
4 files changed, 49 insertions, 20 deletions
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6a60f10be4..358213fe64 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -84,7 +84,9 @@ object SparkBuild extends Build { def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") - def streamingSettings = sharedSettings ++ Seq(name := "spark-streaming") + def streamingSettings = sharedSettings ++ Seq( + name := "spark-streaming" + ) ++ assemblySettings ++ extraAssemblySettings def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( mergeStrategy in assembly := { diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 8c06345933..08eda056c9 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -12,7 +12,7 @@ import spark.Partitioner import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.ArrayBlockingQueue abstract class DStream[T: ClassManifest] (@transient val ssc: StreamingContext) extends Logging with Serializable { @@ -166,15 +166,17 @@ extends Logging with Serializable { def map[U: ClassManifest](mapFunc: T => U) = new MappedDStream(this, ssc.sc.clean(mapFunc)) - def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]) = + def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]) = { new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc)) + } def filter(filterFunc: T => Boolean) = new FilteredDStream(this, filterFunc) def glom() = new GlommedDStream(this) - def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) = + def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) = { new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc)) + } def reduce(reduceFunc: (T, T) => T) = this.map(x => (1, x)).reduceByKey(reduceFunc, 1).map(_._2) @@ -182,18 +184,30 @@ extends Logging with Serializable { def collect() = this.map(x => (1, x)).groupByKey(1).map(_._2) - def foreach(foreachFunc: T => Unit) = { + def foreach(foreachFunc: T => Unit) { val newStream = new PerElementForEachDStream(this, ssc.sc.clean(foreachFunc)) ssc.registerOutputStream(newStream) newStream } - def foreachRDD(foreachFunc: RDD[T] => Unit) = { + def foreachRDD(foreachFunc: RDD[T] => Unit) { + foreachRDD((r: RDD[T], t: Time) => foreachFunc(r)) + } + + def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { val newStream = new PerRDDForEachDStream(this, ssc.sc.clean(foreachFunc)) ssc.registerOutputStream(newStream) newStream } + def transformRDD[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + transformRDD((r: RDD[T], t: Time) => transformFunc(r)) + } + + def transformRDD[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + new TransformedDStream(this, ssc.sc.clean(transformFunc)) + } + private[streaming] def toQueue = { val queue = new ArrayBlockingQueue[RDD[T]](10000) this.foreachRDD(rdd => { @@ -361,15 +375,13 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, - partitioner: Partitioner) - extends DStream [(K,C)] (parent.ssc) { + partitioner: Partitioner + ) extends DStream [(K,C)] (parent.ssc) { override def dependencies = List(parent) override def slideTime: Time = parent.slideTime - - override def compute(validTime: Time): Option[RDD[(K,C)]] = { parent.getOrCompute(validTime) match { case Some(rdd) => @@ -385,7 +397,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( */ class UnifiedDStream[T: ClassManifest](parents: Array[DStream[T]]) -extends DStream[T](parents(0).ssc) { + extends DStream[T](parents(0).ssc) { if (parents.length == 0) { throw new IllegalArgumentException("Empty array of parents") @@ -424,8 +436,8 @@ extends DStream[T](parents(0).ssc) { class PerElementForEachDStream[T: ClassManifest] ( parent: DStream[T], - foreachFunc: T => Unit) -extends DStream[Unit](parent.ssc) { + foreachFunc: T => Unit + ) extends DStream[Unit](parent.ssc) { override def dependencies = List(parent) @@ -455,11 +467,8 @@ extends DStream[Unit](parent.ssc) { class PerRDDForEachDStream[T: ClassManifest] ( parent: DStream[T], - foreachFunc: (RDD[T], Time) => Unit) -extends DStream[Unit](parent.ssc) { - - def this(parent: DStream[T], altForeachFunc: (RDD[T]) => Unit) = - this(parent, (rdd: RDD[T], time: Time) => altForeachFunc(rdd)) + foreachFunc: (RDD[T], Time) => Unit + ) extends DStream[Unit](parent.ssc) { override def dependencies = List(parent) @@ -478,3 +487,22 @@ extends DStream[Unit](parent.ssc) { } } } + + +/** + * TODO + */ + +class TransformedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], + transformFunc: (RDD[T], Time) => RDD[U] + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(transformFunc(_, validTime)) + } + } diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala index eabb33d89e..f313d8c162 100644 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala @@ -4,7 +4,6 @@ import spark.RDD import spark.Partitioner import spark.MapPartitionsRDD import spark.SparkContext._ -import javax.annotation.Nullable class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala index d5eb20b37e..030f351080 100644 --- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala +++ b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala @@ -105,7 +105,7 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging { Seq(("a", 1), ("b", 1), ("c", 1)), Seq(("a", 2), ("b", 2), ("c", 2)), Seq(("a", 3), ("b", 3), ("c", 3)) - )//.map(array => array.toSeq.map(x => (x._1, new RichInt(x._2)))) + ) val updateStateOp =(s: DStream[String]) => { val updateFunc = (values: Seq[Int], state: RichInt) => { |