diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-07 11:02:03 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | 6e514a8d3511891a3f7221c594171477a0b5a38f (patch) | |
tree | 35b51d5f9dc195bca60d2f07f88724ee314b21cd /streaming | |
parent | f144e0413a1e42d193a86fa04af769e2da9dc58b (diff) | |
download | spark-6e514a8d3511891a3f7221c594171477a0b5a38f.tar.gz spark-6e514a8d3511891a3f7221c594171477a0b5a38f.tar.bz2 spark-6e514a8d3511891a3f7221c594171477a0b5a38f.zip |
PairDStream and DStreamLike
Diffstat (limited to 'streaming')
5 files changed, 359 insertions, 100 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 56e54c719a..9e2823d81f 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -1,109 +1,17 @@ package spark.streaming.api.java -import java.util.{List => JList} +import spark.streaming.DStream +import spark.api.java.function.{Function => JFunction} -import scala.collection.JavaConversions._ - -import spark.streaming._ -import spark.api.java.JavaRDD -import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} -import java.util -import spark.RDD - -class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) { - def print() = dstream.print() - - // TODO move to type-specific implementations - def cache() : JavaDStream[T] = { - dstream.cache() - } - - def count() : JavaDStream[Int] = { - dstream.count() - } - - def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { - dstream.countByWindow(windowTime, slideTime) - } - - def compute(validTime: Time): JavaRDD[T] = { - dstream.compute(validTime) match { - case Some(rdd) => new JavaRDD(rdd) - case None => null - } - } - - def context(): StreamingContext = dstream.context() - - def window(windowTime: Time): JavaDStream[T] = { - dstream.window(windowTime) - } - - def window(windowTime: Time, slideTime: Time): JavaDStream[T] = { - dstream.window(windowTime, slideTime) - } - - def tumble(batchTime: Time): JavaDStream[T] = { - dstream.tumble(batchTime) - } - - def map[R](f: JFunction[T, R]): JavaDStream[R] = { - new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) - } +class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) + extends JavaDStreamLike[T, JavaDStream[T]] { def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = { dstream.filter((x => f(x).booleanValue())) } - - def glom(): JavaDStream[JList[T]] = { - new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) - } - - // TODO: Other map partitions - def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) - new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) - } - - def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) - - def reduceByWindow( - reduceFunc: JFunction2[T, T, T], - invReduceFunc: JFunction2[T, T, T], - windowTime: Time, - slideTime: Time): JavaDStream[T] = { - dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime) - } - - def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = { - new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) - } - - def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = { - dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) - } - - def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = { - dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) - } - - def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] - def scalaTransform (in: RDD[T]): RDD[U] = { - transformFunc.call(new JavaRDD[T](in)).rdd - } - dstream.transform(scalaTransform(_)) - } - // TODO: transform with time - - def union(that: JavaDStream[T]): JavaDStream[T] = { - dstream.union(that.dstream) - } } object JavaDStream { implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] = new JavaDStream[T](dstream) - -}
\ No newline at end of file +} diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala new file mode 100644 index 0000000000..daea56f50c --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -0,0 +1,109 @@ +package spark.streaming.api.java + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.api.java.JavaRDD +import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import java.util +import spark.RDD +import JavaDStream._ + +trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable { + implicit val classManifest: ClassManifest[T] + + def dstream: DStream[T] + + def print() = dstream.print() + + // TODO move to type-specific implementations + def cache() : JavaDStream[T] = { + dstream.cache() + } + + def count() : JavaDStream[Int] = { + dstream.count() + } + + def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { + dstream.countByWindow(windowTime, slideTime) + } + + def compute(validTime: Time): JavaRDD[T] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaRDD(rdd) + case None => null + } + } + + def context(): StreamingContext = dstream.context() + + def window(windowTime: Time): JavaDStream[T] = { + dstream.window(windowTime) + } + + def window(windowTime: Time, slideTime: Time): JavaDStream[T] = { + dstream.window(windowTime, slideTime) + } + + def tumble(batchTime: Time): JavaDStream[T] = { + dstream.tumble(batchTime) + } + + def map[R](f: JFunction[T, R]): JavaDStream[R] = { + new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) + } + + def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = { + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) + } + + def glom(): JavaDStream[JList[T]] = { + new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + } + + // TODO: Other map partitions + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) + } + + def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) + + def reduceByWindow( + reduceFunc: JFunction2[T, T, T], + invReduceFunc: JFunction2[T, T, T], + windowTime: Time, + slideTime: Time): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + } + + def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = { + new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) + } + + def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = { + dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) + } + + def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = { + dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) + } + + def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T]): RDD[U] = { + transformFunc.call(new JavaRDD[T](in)).rdd + } + dstream.transform(scalaTransform(_)) + } + // TODO: transform with time + + def union(that: JavaDStream[T]): JavaDStream[T] = { + dstream.union(that.dstream) + } +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala new file mode 100644 index 0000000000..01dda24fde --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -0,0 +1,134 @@ +package spark.streaming.api.java + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.streaming.StreamingContext._ +import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import spark.Partitioner + +class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( + implicit val kManifiest: ClassManifest[K], + implicit val vManifest: ClassManifest[V]) + extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] { + + def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = { + dstream.filter((x => f(x).booleanValue())) + } + + def groupByKey(): JavaPairDStream[K, JList[V]] = { + dstream.groupByKey().mapValues(seqAsJavaList _) + } + + def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = { + dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) + } + + def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = { + dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) + } + + def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = { + dstream.reduceByKey(func) + } + + def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = { + dstream.reduceByKey(func, numPartitions) + } + + // TODO: TEST BELOW + def combineByKey[C](createCombiner: Function[V, C], + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + partitioner: Partitioner): JavaPairDStream[K, C] = { + implicit val cm: ClassManifest[C] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] + dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) + } + + def countByKey(numPartitions: Int): JavaPairDStream[K, Long] = { + dstream.countByKey(numPartitions); + } + + def countByKey(): JavaPairDStream[K, Long] = { + dstream.countByKey(); + } + + def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowTime, slideTime).mapValues(seqAsJavaList _) + } + + def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int): + JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowTime, slideTime, numPartitions).mapValues(seqAsJavaList _) + } + + def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner): + JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowTime, slideTime, partitioner).mapValues(seqAsJavaList _) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time): + JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowTime) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time): + JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time, + numPartitions: Int): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, numPartitions) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time, + partitioner: Partitioner): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, partitioner) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], + windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], + windowTime: Time, slideTime: Time, numPartitions: Int): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, numPartitions) + } + + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], + windowTime: Time, slideTime: Time, partitioner: Partitioner) + : JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner) + } + + def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, Long] = { + dstream.countByKeyAndWindow(windowTime, slideTime) + } + + def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int) + : JavaPairDStream[K, Long] = { + dstream.countByKeyAndWindow(windowTime, slideTime, numPartitions) + } + + override val classManifest: ClassManifest[(K, V)] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] +} + +object JavaPairDStream { + implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]): + JavaPairDStream[K, V] = + new JavaPairDStream[K, V](dstream) + + def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + new JavaPairDStream[K, V](dstream.dstream) + } +} diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala index 776b0e6bb6..9f3a80df8b 100644 --- a/streaming/src/test/scala/JavaTestUtils.scala +++ b/streaming/src/test/scala/JavaTestUtils.scala @@ -2,7 +2,7 @@ package spark.streaming import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import java.util.{List => JList} -import spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} import spark.streaming._ import java.util.ArrayList import collection.JavaConversions._ @@ -20,7 +20,8 @@ object JavaTestUtils extends TestSuiteBase { new JavaDStream[T](dstream) } - def attachTestOutputStream[T](dstream: JavaDStream[T]) = { + def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]] + (dstream: JavaDStreamLike[T, This]) = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] val ostream = new TestOutputStream(dstream.dstream, @@ -37,6 +38,5 @@ object JavaTestUtils extends TestSuiteBase { res.map(entry => out.append(new ArrayList[V](entry))) out } - } diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index c4629c8d97..c1373e6275 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -5,12 +5,15 @@ import org.junit.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; +import scala.Tuple2; import spark.api.java.JavaRDD; import spark.api.java.function.FlatMapFunction; import spark.api.java.function.Function; import spark.api.java.function.Function2; +import spark.api.java.function.PairFunction; import spark.streaming.JavaTestUtils; import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; import spark.streaming.api.java.JavaStreamingContext; import java.io.Serializable; @@ -340,4 +343,109 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, actual); } + + // PairDStream Functions + @Test + public void testPairFilter() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("giants", 6)), + Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = stream.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2 call(String in) throws Exception { + return new Tuple2<String, Integer>(in, in.length()); + } + }); + + JavaPairDStream<String, Integer> filtered = pairStream.filter( + new Function<Tuple2<String, Integer>, Boolean>() { + @Override + public Boolean call(Tuple2<String, Integer> in) throws Exception { + return in._1().contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairGroupByKey() { + List<List<Tuple2<String, String>>> inputData = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "yankees"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "rangers"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey(); + JavaTestUtils.attachTestOutputStream(grouped); + List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairReduceByKey() { + List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 1), + new Tuple2<String, Integer>("california", 3), + new Tuple2<String, Integer>("new york", 4), + new Tuple2<String, Integer>("new york", 1)), + Arrays.asList( + new Tuple2<String, Integer>("california", 5), + new Tuple2<String, Integer>("california", 5), + new Tuple2<String, Integer>("new york", 3), + new Tuple2<String, Integer>("new york", 1))); + + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList( + new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey( + new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } } |