From 74182010a4916c5b03ec74c54c21d89bcab36723 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 7 Jan 2013 20:19:57 -0800 Subject: Style cleanup and moving functions --- .../spark/streaming/api/java/JavaDStream.scala | 26 ++++- .../spark/streaming/api/java/JavaDStreamLike.scala | 41 +------ .../spark/streaming/api/java/JavaPairDStream.scala | 128 ++++++++++++++++----- 3 files changed, 128 insertions(+), 67 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 9e2823d81f..9bf595e0bc 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -1,14 +1,36 @@ package spark.streaming.api.java -import spark.streaming.DStream +import spark.streaming.{Time, DStream} import spark.api.java.function.{Function => JFunction} +import spark.api.java.JavaRDD +import java.util.{List => JList} class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) extends JavaDStreamLike[T, JavaDStream[T]] { - def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = { + def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = dstream.filter((x => f(x).booleanValue())) + + def cache(): JavaDStream[T] = dstream.cache() + + def compute(validTime: Time): JavaRDD[T] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaRDD(rdd) + case None => null + } } + + def window(windowTime: Time): JavaDStream[T] = + dstream.window(windowTime) + + def window(windowTime: Time, slideTime: Time): JavaDStream[T] = + dstream.window(windowTime, slideTime) + + def tumble(batchTime: Time): JavaDStream[T] = + dstream.tumble(batchTime) + + def union(that: JavaDStream[T]): JavaDStream[T] = + dstream.union(that.dstream) } object JavaDStream { diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index daea56f50c..b11859ceaf 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -18,40 +18,17 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable def print() = dstream.print() - // TODO move to type-specific implementations - def cache() : JavaDStream[T] = { - dstream.cache() - } - - def count() : JavaDStream[Int] = { - dstream.count() - } + def count(): JavaDStream[Int] = dstream.count() def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { dstream.countByWindow(windowTime, slideTime) } - def compute(validTime: Time): JavaRDD[T] = { - dstream.compute(validTime) match { - case Some(rdd) => new JavaRDD(rdd) - case None => null - } - } + def glom(): JavaDStream[JList[T]] = + new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) def context(): StreamingContext = dstream.context() - def window(windowTime: Time): JavaDStream[T] = { - dstream.window(windowTime) - } - - def window(windowTime: Time, slideTime: Time): JavaDStream[T] = { - dstream.window(windowTime, slideTime) - } - - def tumble(batchTime: Time): JavaDStream[T] = { - dstream.tumble(batchTime) - } - def map[R](f: JFunction[T, R]): JavaDStream[R] = { new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) } @@ -61,10 +38,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } - def glom(): JavaDStream[JList[T]] = { - new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) - } - // TODO: Other map partitions def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) @@ -85,11 +58,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) } - def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = { + def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) { dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) } - def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = { + def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) { dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) } @@ -102,8 +75,4 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable dstream.transform(scalaTransform(_)) } // TODO: transform with time - - def union(that: JavaDStream[T]): JavaDStream[T] = { - dstream.union(that.dstream) - } } \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index cb80a2f3e7..f6dfbb2345 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -6,43 +6,64 @@ import scala.collection.JavaConversions._ import spark.streaming._ import spark.streaming.StreamingContext._ -import spark.api.java.function.{Function => JFunction, Function2 => JFunction2, FlatMapFunction} +import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import spark.Partitioner +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.conf.Configuration +import spark.api.java.{JavaPairRDD, JavaRDD} class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifiest: ClassManifest[K], implicit val vManifest: ClassManifest[V]) extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] { - def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = { + // Common to all DStream's + def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = dstream.filter((x => f(x).booleanValue())) + + def cache(): JavaPairDStream[K, V] = dstream.cache() + + def compute(validTime: Time): JavaPairRDD[K, V] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaPairRDD(rdd) + case None => null + } } - def groupByKey(): JavaPairDStream[K, JList[V]] = { + def window(windowTime: Time): JavaPairDStream[K, V] = + dstream.window(windowTime) + + def window(windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = + dstream.window(windowTime, slideTime) + + def tumble(batchTime: Time): JavaPairDStream[K, V] = + dstream.tumble(batchTime) + + def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = + dstream.union(that.dstream) + + // Only for PairDStreams... + def groupByKey(): JavaPairDStream[K, JList[V]] = dstream.groupByKey().mapValues(seqAsJavaList _) - } - def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = { + def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) - } - def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = { + def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) - } - def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = { + def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = dstream.reduceByKey(func) - } - def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = { + def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = dstream.reduceByKey(func, numPartitions) - } // TODO: TEST BELOW def combineByKey[C](createCombiner: Function[V, C], - mergeValue: JFunction2[C, V, C], - mergeCombiners: JFunction2[C, C, C], - partitioner: Partitioner): JavaPairDStream[K, C] = { + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + partitioner: Partitioner): JavaPairDStream[K, C] = { implicit val cm: ClassManifest[C] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) @@ -60,28 +81,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.groupByKeyAndWindow(windowTime, slideTime).mapValues(seqAsJavaList _) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int): - JavaPairDStream[K, JList[V]] = { + def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int) + :JavaPairDStream[K, JList[V]] = { dstream.groupByKeyAndWindow(windowTime, slideTime, numPartitions).mapValues(seqAsJavaList _) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner): - JavaPairDStream[K, JList[V]] = { + def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner) + :JavaPairDStream[K, JList[V]] = { dstream.groupByKeyAndWindow(windowTime, slideTime, partitioner).mapValues(seqAsJavaList _) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time): - JavaPairDStream[K, V] = { + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time) + :JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, windowTime) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time): - JavaPairDStream[K, V] = { + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time) + :JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time, - numPartitions: Int): JavaPairDStream[K, V] = { + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowTime: Time, + slideTime: Time, + numPartitions: Int): JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, numPartitions) } @@ -136,7 +160,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (JList[V], JList[W])] = { + : JavaPairDStream[K, (JList[V], JList[W])] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] dstream.cogroup(other.dstream, partitioner) @@ -150,19 +174,65 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (V, W)] = { + : JavaPairDStream[K, (V, W)] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] dstream.join(other.dstream, partitioner) } + def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) { + dstream.saveAsHadoopFiles(prefix, suffix) + } + + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]]) { + dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) + } + + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf) { + dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) + } + + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix) + } + + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) + } + + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) + } + override val classManifest: ClassManifest[(K, V)] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] } object JavaPairDStream { - implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]): - JavaPairDStream[K, V] = + implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) + :JavaPairDStream[K, V] = new JavaPairDStream[K, V](dstream) def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { -- cgit v1.2.3