diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-10 19:29:33 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | c2537057f9ed8723d2c33a1636edf9c9547cdc66 (patch) | |
tree | 30438ce75878f523b7d9aaf100d78fefa925b4b6 /streaming/src | |
parent | b36c4f7cce53446753ecc0ce6f9bdccb12b3350b (diff) | |
download | spark-c2537057f9ed8723d2c33a1636edf9c9547cdc66.tar.gz spark-c2537057f9ed8723d2c33a1636edf9c9547cdc66.tar.bz2 spark-c2537057f9ed8723d2c33a1636edf9c9547cdc66.zip |
Fixing issue with <Long> types
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala | 18 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/JavaAPISuite.java | 8 |
2 files changed, 16 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index a19a476724..fa46ca9267 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -1,6 +1,7 @@ package spark.streaming.api.java import java.util.{List => JList} +import java.lang.{Long => JLong} import scala.collection.JavaConversions._ @@ -13,6 +14,7 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.conf.Configuration import spark.api.java.JavaPairRDD import spark.storage.StorageLevel +import java.lang class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifiest: ClassManifest[K], @@ -107,12 +109,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) } - def countByKey(numPartitions: Int): JavaPairDStream[K, Long] = { - dstream.countByKey(numPartitions); + def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions)); } - def countByKey(): JavaPairDStream[K, Long] = { - dstream.countByKey(); + def countByKey(): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKey()); } def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = { @@ -168,8 +170,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner) } - def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, Long] = { - dstream.countByKeyAndWindow(windowTime, slideTime) + def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowTime, slideTime)) } def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int) @@ -280,4 +282,8 @@ object JavaPairDStream { implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] new JavaPairDStream[K, V](dstream.dstream) } + + def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]): JavaPairDStream[K, JLong] = { + StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_)) + } } diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index 9e8438d04c..6584d861ed 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -1,6 +1,7 @@ package spark.streaming; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -492,8 +493,7 @@ public class JavaAPISuite implements Serializable { sc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - // TODO: Below fails with compile error with <String, Long>... wtf? - JavaPairDStream<String, Object> counted = pairStream.countByKey(); + JavaPairDStream<String, Long> counted = pairStream.countByKey(); JavaTestUtils.attachTestOutputStream(counted); List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2); @@ -589,8 +589,8 @@ public class JavaAPISuite implements Serializable { sc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - // TODO: Below fails with compile error with <String, Long>... wtf? - JavaPairDStream<String, Object> counted = pairStream.countByKeyAndWindow(new Time(2000), new Time(1000)); + JavaPairDStream<String, Long> counted = + pairStream.countByKeyAndWindow(new Time(2000), new Time(1000)); JavaTestUtils.attachTestOutputStream(counted); List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3); |