aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-07 20:19:57 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit74182010a4916c5b03ec74c54c21d89bcab36723 (patch)
tree16be9fb6ec06209f98d0064554b2a9e37e4b4935 /streaming
parent056f5efc557a8fcb8871d5abbee082b6398ba78c (diff)
downloadspark-74182010a4916c5b03ec74c54c21d89bcab36723.tar.gz
spark-74182010a4916c5b03ec74c54c21d89bcab36723.tar.bz2
spark-74182010a4916c5b03ec74c54c21d89bcab36723.zip
Style cleanup and moving functions
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala26
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala41
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala128
3 files changed, 128 insertions, 67 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 9e2823d81f..9bf595e0bc 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -1,14 +1,36 @@
package spark.streaming.api.java
-import spark.streaming.DStream
+import spark.streaming.{Time, DStream}
import spark.api.java.function.{Function => JFunction}
+import spark.api.java.JavaRDD
+import java.util.{List => JList}
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
extends JavaDStreamLike[T, JavaDStream[T]] {
- def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = {
+ def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
dstream.filter((x => f(x).booleanValue()))
+
+ def cache(): JavaDStream[T] = dstream.cache()
+
+ def compute(validTime: Time): JavaRDD[T] = {
+ dstream.compute(validTime) match {
+ case Some(rdd) => new JavaRDD(rdd)
+ case None => null
+ }
}
+
+ def window(windowTime: Time): JavaDStream[T] =
+ dstream.window(windowTime)
+
+ def window(windowTime: Time, slideTime: Time): JavaDStream[T] =
+ dstream.window(windowTime, slideTime)
+
+ def tumble(batchTime: Time): JavaDStream[T] =
+ dstream.tumble(batchTime)
+
+ def union(that: JavaDStream[T]): JavaDStream[T] =
+ dstream.union(that.dstream)
}
object JavaDStream {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index daea56f50c..b11859ceaf 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -18,40 +18,17 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def print() = dstream.print()
- // TODO move to type-specific implementations
- def cache() : JavaDStream[T] = {
- dstream.cache()
- }
-
- def count() : JavaDStream[Int] = {
- dstream.count()
- }
+ def count(): JavaDStream[Int] = dstream.count()
def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = {
dstream.countByWindow(windowTime, slideTime)
}
- def compute(validTime: Time): JavaRDD[T] = {
- dstream.compute(validTime) match {
- case Some(rdd) => new JavaRDD(rdd)
- case None => null
- }
- }
+ def glom(): JavaDStream[JList[T]] =
+ new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
def context(): StreamingContext = dstream.context()
- def window(windowTime: Time): JavaDStream[T] = {
- dstream.window(windowTime)
- }
-
- def window(windowTime: Time, slideTime: Time): JavaDStream[T] = {
- dstream.window(windowTime, slideTime)
- }
-
- def tumble(batchTime: Time): JavaDStream[T] = {
- dstream.tumble(batchTime)
- }
-
def map[R](f: JFunction[T, R]): JavaDStream[R] = {
new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
}
@@ -61,10 +38,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
- def glom(): JavaDStream[JList[T]] = {
- new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
- }
-
// TODO: Other map partitions
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
@@ -85,11 +58,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq)
}
- def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = {
+ def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
}
- def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = {
+ def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
}
@@ -102,8 +75,4 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
dstream.transform(scalaTransform(_))
}
// TODO: transform with time
-
- def union(that: JavaDStream[T]): JavaDStream[T] = {
- dstream.union(that.dstream)
- }
} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index cb80a2f3e7..f6dfbb2345 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -6,43 +6,64 @@ import scala.collection.JavaConversions._
import spark.streaming._
import spark.streaming.StreamingContext._
-import spark.api.java.function.{Function => JFunction, Function2 => JFunction2, FlatMapFunction}
+import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import spark.Partitioner
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.conf.Configuration
+import spark.api.java.{JavaPairRDD, JavaRDD}
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
- def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = {
+ // Common to all DStream's
+ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
dstream.filter((x => f(x).booleanValue()))
+
+ def cache(): JavaPairDStream[K, V] = dstream.cache()
+
+ def compute(validTime: Time): JavaPairRDD[K, V] = {
+ dstream.compute(validTime) match {
+ case Some(rdd) => new JavaPairRDD(rdd)
+ case None => null
+ }
}
- def groupByKey(): JavaPairDStream[K, JList[V]] = {
+ def window(windowTime: Time): JavaPairDStream[K, V] =
+ dstream.window(windowTime)
+
+ def window(windowTime: Time, slideTime: Time): JavaPairDStream[K, V] =
+ dstream.window(windowTime, slideTime)
+
+ def tumble(batchTime: Time): JavaPairDStream[K, V] =
+ dstream.tumble(batchTime)
+
+ def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
+ dstream.union(that.dstream)
+
+ // Only for PairDStreams...
+ def groupByKey(): JavaPairDStream[K, JList[V]] =
dstream.groupByKey().mapValues(seqAsJavaList _)
- }
- def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = {
+ def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
- }
- def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = {
+ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
- }
- def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = {
+ def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
dstream.reduceByKey(func)
- }
- def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = {
+ def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] =
dstream.reduceByKey(func, numPartitions)
- }
// TODO: TEST BELOW
def combineByKey[C](createCombiner: Function[V, C],
- mergeValue: JFunction2[C, V, C],
- mergeCombiners: JFunction2[C, C, C],
- partitioner: Partitioner): JavaPairDStream[K, C] = {
+ mergeValue: JFunction2[C, V, C],
+ mergeCombiners: JFunction2[C, C, C],
+ partitioner: Partitioner): JavaPairDStream[K, C] = {
implicit val cm: ClassManifest[C] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
@@ -60,28 +81,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.groupByKeyAndWindow(windowTime, slideTime).mapValues(seqAsJavaList _)
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int):
- JavaPairDStream[K, JList[V]] = {
+ def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int)
+ :JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowTime, slideTime, numPartitions).mapValues(seqAsJavaList _)
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner):
- JavaPairDStream[K, JList[V]] = {
+ def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner)
+ :JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowTime, slideTime, partitioner).mapValues(seqAsJavaList _)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time):
- JavaPairDStream[K, V] = {
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time)
+ :JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowTime)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time):
- JavaPairDStream[K, V] = {
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time)
+ :JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time,
- numPartitions: Int): JavaPairDStream[K, V] = {
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, numPartitions)
}
@@ -136,7 +160,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
- : JavaPairDStream[K, (JList[V], JList[W])] = {
+ : JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.cogroup(other.dstream, partitioner)
@@ -150,19 +174,65 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
- : JavaPairDStream[K, (V, W)] = {
+ : JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.join(other.dstream, partitioner)
}
+ def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
+ dstream.saveAsHadoopFiles(prefix, suffix)
+ }
+
+ def saveAsHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]]) {
+ dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
+ }
+
+ def saveAsHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ conf: JobConf) {
+ dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
+ }
+
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
+ dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
+ }
+
+ def saveAsNewAPIHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
+ dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
+ }
+
+ def saveAsNewAPIHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ conf: Configuration = new Configuration) {
+ dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
+ }
+
override val classManifest: ClassManifest[(K, V)] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
}
object JavaPairDStream {
- implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]):
- JavaPairDStream[K, V] =
+ implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)])
+ :JavaPairDStream[K, V] =
new JavaPairDStream[K, V](dstream)
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {