From 7e1049d8f1b155a4bd742e84927c4cc83bb71cb6 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 7 Jan 2013 21:13:55 -0800 Subject: Squashing a few TODOs --- .../spark/streaming/api/java/JavaDStreamLike.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index b11859ceaf..05d89918b2 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -38,12 +38,17 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } - // TODO: Other map partitions def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) } + def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]) + : JavaPairDStream[K, V] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) + } + def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) def reduceByWindow( @@ -69,10 +74,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] - def scalaTransform (in: RDD[T]): RDD[U] = { + def scalaTransform (in: RDD[T]): RDD[U] = transformFunc.call(new JavaRDD[T](in)).rdd - } dstream.transform(scalaTransform(_)) } - // TODO: transform with time + + def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T], time: Time): RDD[U] = + transformFunc.call(new JavaRDD[T](in), time).rdd + dstream.transform(scalaTransform(_, _)) + } } \ No newline at end of file -- cgit v1.2.3