From c2537057f9ed8723d2c33a1636edf9c9547cdc66 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 10 Jan 2013 19:29:33 -0800 Subject: Fixing issue with types --- .../spark/streaming/api/java/JavaPairDStream.scala | 18 ++++++++++++------ .../src/test/scala/spark/streaming/JavaAPISuite.java | 8 ++++---- 2 files changed, 16 insertions(+), 10 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index a19a476724..fa46ca9267 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -1,6 +1,7 @@ package spark.streaming.api.java import java.util.{List => JList} +import java.lang.{Long => JLong} import scala.collection.JavaConversions._ @@ -13,6 +14,7 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.conf.Configuration import spark.api.java.JavaPairRDD import spark.storage.StorageLevel +import java.lang class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifiest: ClassManifest[K], @@ -107,12 +109,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) } - def countByKey(numPartitions: Int): JavaPairDStream[K, Long] = { - dstream.countByKey(numPartitions); + def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions)); } - def countByKey(): JavaPairDStream[K, Long] = { - dstream.countByKey(); + def countByKey(): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKey()); } def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = { @@ -168,8 +170,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner) } - def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, Long] = { - dstream.countByKeyAndWindow(windowTime, slideTime) + def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowTime, slideTime)) } def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int) @@ -280,4 +282,8 @@ object JavaPairDStream { implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] new JavaPairDStream[K, V](dstream.dstream) } + + def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]): JavaPairDStream[K, JLong] = { + StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_)) + } } diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index 9e8438d04c..6584d861ed 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -1,6 +1,7 @@ package spark.streaming; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -492,8 +493,7 @@ public class JavaAPISuite implements Serializable { sc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - // TODO: Below fails with compile error with ... wtf? - JavaPairDStream counted = pairStream.countByKey(); + JavaPairDStream counted = pairStream.countByKey(); JavaTestUtils.attachTestOutputStream(counted); List>> result = JavaTestUtils.runStreams(sc, 2, 2); @@ -589,8 +589,8 @@ public class JavaAPISuite implements Serializable { sc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - // TODO: Below fails with compile error with ... wtf? - JavaPairDStream counted = pairStream.countByKeyAndWindow(new Time(2000), new Time(1000)); + JavaPairDStream counted = + pairStream.countByKeyAndWindow(new Time(2000), new Time(1000)); JavaTestUtils.attachTestOutputStream(counted); List>> result = JavaTestUtils.runStreams(sc, 3, 3); -- cgit v1.2.3