diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-07 21:13:55 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | 7e1049d8f1b155a4bd742e84927c4cc83bb71cb6 (patch) | |
tree | e67cbc62d54077c5417fc785e156f4401326cd68 /streaming/src | |
parent | 74182010a4916c5b03ec74c54c21d89bcab36723 (diff) | |
download | spark-7e1049d8f1b155a4bd742e84927c4cc83bb71cb6.tar.gz spark-7e1049d8f1b155a4bd742e84927c4cc83bb71cb6.tar.bz2 spark-7e1049d8f1b155a4bd742e84927c4cc83bb71cb6.zip |
Squashing a few TODOs
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala | 19 |
1 files changed, 15 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index b11859ceaf..05d89918b2 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -38,12 +38,17 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } - // TODO: Other map partitions def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) } + def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]) + : JavaPairDStream[K, V] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) + } + def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) def reduceByWindow( @@ -69,10 +74,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] - def scalaTransform (in: RDD[T]): RDD[U] = { + def scalaTransform (in: RDD[T]): RDD[U] = transformFunc.call(new JavaRDD[T](in)).rdd - } dstream.transform(scalaTransform(_)) } - // TODO: transform with time + + def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T], time: Time): RDD[U] = + transformFunc.call(new JavaRDD[T](in), time).rdd + dstream.transform(scalaTransform(_, _)) + } }
\ No newline at end of file |