aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-07 21:13:55 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit7e1049d8f1b155a4bd742e84927c4cc83bb71cb6 (patch)
treee67cbc62d54077c5417fc785e156f4401326cd68 /streaming/src
parent74182010a4916c5b03ec74c54c21d89bcab36723 (diff)
downloadspark-7e1049d8f1b155a4bd742e84927c4cc83bb71cb6.tar.gz
spark-7e1049d8f1b155a4bd742e84927c4cc83bb71cb6.tar.bz2
spark-7e1049d8f1b155a4bd742e84927c4cc83bb71cb6.zip
Squashing a few TODOs
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala19
1 files changed, 15 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index b11859ceaf..05d89918b2 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -38,12 +38,17 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
- // TODO: Other map partitions
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
}
+ def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V])
+ : JavaPairDStream[K, V] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
+ }
+
def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
def reduceByWindow(
@@ -69,10 +74,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
- def scalaTransform (in: RDD[T]): RDD[U] = {
+ def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(new JavaRDD[T](in)).rdd
- }
dstream.transform(scalaTransform(_))
}
- // TODO: transform with time
+
+ def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ def scalaTransform (in: RDD[T], time: Time): RDD[U] =
+ transformFunc.call(new JavaRDD[T](in), time).rdd
+ dstream.transform(scalaTransform(_, _))
+ }
} \ No newline at end of file