diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-07 11:02:03 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | 6e514a8d3511891a3f7221c594171477a0b5a38f (patch) | |
tree | 35b51d5f9dc195bca60d2f07f88724ee314b21cd /streaming/src/main | |
parent | f144e0413a1e42d193a86fa04af769e2da9dc58b (diff) | |
download | spark-6e514a8d3511891a3f7221c594171477a0b5a38f.tar.gz spark-6e514a8d3511891a3f7221c594171477a0b5a38f.tar.bz2 spark-6e514a8d3511891a3f7221c594171477a0b5a38f.zip |
PairDStream and DStreamLike
Diffstat (limited to 'streaming/src/main')
3 files changed, 248 insertions, 97 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 56e54c719a..9e2823d81f 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -1,109 +1,17 @@ package spark.streaming.api.java -import java.util.{List => JList} +import spark.streaming.DStream +import spark.api.java.function.{Function => JFunction} -import scala.collection.JavaConversions._ - -import spark.streaming._ -import spark.api.java.JavaRDD -import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} -import java.util -import spark.RDD - -class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) { - def print() = dstream.print() - - // TODO move to type-specific implementations - def cache() : JavaDStream[T] = { - dstream.cache() - } - - def count() : JavaDStream[Int] = { - dstream.count() - } - - def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { - dstream.countByWindow(windowTime, slideTime) - } - - def compute(validTime: Time): JavaRDD[T] = { - dstream.compute(validTime) match { - case Some(rdd) => new JavaRDD(rdd) - case None => null - } - } - - def context(): StreamingContext = dstream.context() - - def window(windowTime: Time): JavaDStream[T] = { - dstream.window(windowTime) - } - - def window(windowTime: Time, slideTime: Time): JavaDStream[T] = { - dstream.window(windowTime, slideTime) - } - - def tumble(batchTime: Time): JavaDStream[T] = { - dstream.tumble(batchTime) - } - - def map[R](f: JFunction[T, R]): JavaDStream[R] = { - new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) - } +class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) + extends JavaDStreamLike[T, JavaDStream[T]] { def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = { dstream.filter((x => f(x).booleanValue())) } - - def glom(): JavaDStream[JList[T]] = { - new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) - } - - // TODO: Other map partitions - def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) - new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) - } - - def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) - - def reduceByWindow( - reduceFunc: JFunction2[T, T, T], - invReduceFunc: JFunction2[T, T, T], - windowTime: Time, - slideTime: Time): JavaDStream[T] = { - dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime) - } - - def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = { - new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) - } - - def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = { - dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) - } - - def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = { - dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) - } - - def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] - def scalaTransform (in: RDD[T]): RDD[U] = { - transformFunc.call(new JavaRDD[T](in)).rdd - } - dstream.transform(scalaTransform(_)) - } - // TODO: transform with time - - def union(that: JavaDStream[T]): JavaDStream[T] = { - dstream.union(that.dstream) - } } object JavaDStream { implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] = new JavaDStream[T](dstream) - -}
\ No newline at end of file +} diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala new file mode 100644 index 0000000000..daea56f50c --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -0,0 +1,109 @@ +package spark.streaming.api.java + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.api.java.JavaRDD +import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import java.util +import spark.RDD +import JavaDStream._ + +trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable { + implicit val classManifest: ClassManifest[T] + + def dstream: DStream[T] + + def print() = dstream.print() + + // TODO move to type-specific implementations + def cache() : JavaDStream[T] = { + dstream.cache() + } + + def count() : JavaDStream[Int] = { + dstream.count() + } + + def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { + dstream.countByWindow(windowTime, slideTime) + } + + def compute(validTime: Time): JavaRDD[T] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaRDD(rdd) + case None => null + } + } + + def context(): StreamingContext = dstream.context() + + def window(windowTime: Time): JavaDStream[T] = { + dstream.window(windowTime) + } + + def window(windowTime: Time, slideTime: Time): JavaDStream[T] = { + dstream.window(windowTime, slideTime) + } + + def tumble(batchTime: Time): JavaDStream[T] = { + dstream.tumble(batchTime) + } + + def map[R](f: JFunction[T, R]): JavaDStream[R] = { + new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) + } + + def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = { + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) + } + + def glom(): JavaDStream[JList[T]] = { + new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + } + + // TODO: Other map partitions + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) + } + + def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) + + def reduceByWindow( + reduceFunc: JFunction2[T, T, T], + invReduceFunc: JFunction2[T, T, T], + windowTime: Time, + slideTime: Time): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + } + + def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = { + new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) + } + + def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = { + dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) + } + + def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = { + dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) + } + + def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T]): RDD[U] = { + transformFunc.call(new JavaRDD[T](in)).rdd + } + dstream.transform(scalaTransform(_)) + } + // TODO: transform with time + + def union(that: JavaDStream[T]): JavaDStream[T] = { + dstream.union(that.dstream) + } +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala new file mode 100644 index 0000000000..01dda24fde --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -0,0 +1,134 @@ +package spark.streaming.api.java + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.streaming.StreamingContext._ +import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import spark.Partitioner + +class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( + implicit val kManifiest: ClassManifest[K], + implicit val vManifest: ClassManifest[V]) + extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] { + + def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = { + dstream.filter((x => f(x).booleanValue())) + } + + def groupByKey(): JavaPairDStream[K, JList[V]] = { + dstream.groupByKey().mapValues(seqAsJavaList _) + } + + def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = { + dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) + } + + def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = { + dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) + } + + def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = { + dstream.reduceByKey(func) + } + + def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = { + dstream.reduceByKey(func, numPartitions) + } + + // TODO: TEST BELOW + def combineByKey[C](createCombiner: Function[V, C], + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + partitioner: Partitioner): JavaPairDStream[K, C] = { + implicit val cm: ClassManifest[C] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] + dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) + } + + def countByKey(numPartitions: Int): JavaPairDStream[K, Long] = { + dstream.countByKey(numPartitions); + } + + def countByKey(): JavaPairDStream[K, Long] = { + dstream.countByKey(); + } + + def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowTime, slideTime).mapValues(seqAsJavaList _) + } + + def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int): + JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowTime, slideTime, numPartitions).mapValues(seqAsJavaList _) + } + + def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner): + JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowTime, slideTime, partitioner).mapValues(seqAsJavaList _) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time): + JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowTime) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time): + JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time, + numPartitions: Int): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, numPartitions) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time, + partitioner: Partitioner): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, partitioner) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], + windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], + windowTime: Time, slideTime: Time, numPartitions: Int): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, numPartitions) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], + windowTime: Time, slideTime: Time, partitioner: Partitioner) + : JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner) + } + + def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, Long] = { + dstream.countByKeyAndWindow(windowTime, slideTime) + } + + def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int) + : JavaPairDStream[K, Long] = { + dstream.countByKeyAndWindow(windowTime, slideTime, numPartitions) + } + + override val classManifest: ClassManifest[(K, V)] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] +} + +object JavaPairDStream { + implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]): + JavaPairDStream[K, V] = + new JavaPairDStream[K, V](dstream) + + def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + new JavaPairDStream[K, V](dstream.dstream) + } +} |