aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-10 19:29:33 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commitc2537057f9ed8723d2c33a1636edf9c9547cdc66 (patch)
tree30438ce75878f523b7d9aaf100d78fefa925b4b6 /streaming/src
parentb36c4f7cce53446753ecc0ce6f9bdccb12b3350b (diff)
downloadspark-c2537057f9ed8723d2c33a1636edf9c9547cdc66.tar.gz
spark-c2537057f9ed8723d2c33a1636edf9c9547cdc66.tar.bz2
spark-c2537057f9ed8723d2c33a1636edf9c9547cdc66.zip
Fixing issue with <Long> types
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala18
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java8
2 files changed, 16 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index a19a476724..fa46ca9267 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -1,6 +1,7 @@
package spark.streaming.api.java
import java.util.{List => JList}
+import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
@@ -13,6 +14,7 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
import spark.api.java.JavaPairRDD
import spark.storage.StorageLevel
+import java.lang
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K],
@@ -107,12 +109,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
- def countByKey(numPartitions: Int): JavaPairDStream[K, Long] = {
- dstream.countByKey(numPartitions);
+ def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions));
}
- def countByKey(): JavaPairDStream[K, Long] = {
- dstream.countByKey();
+ def countByKey(): JavaPairDStream[K, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByKey());
}
def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = {
@@ -168,8 +170,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner)
}
- def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, Long] = {
- dstream.countByKeyAndWindow(windowTime, slideTime)
+ def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowTime, slideTime))
}
def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int)
@@ -280,4 +282,8 @@ object JavaPairDStream {
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
new JavaPairDStream[K, V](dstream.dstream)
}
+
+ def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]): JavaPairDStream[K, JLong] = {
+ StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
+ }
}
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index 9e8438d04c..6584d861ed 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -1,6 +1,7 @@
package spark.streaming;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -492,8 +493,7 @@ public class JavaAPISuite implements Serializable {
sc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- // TODO: Below fails with compile error with <String, Long>... wtf?
- JavaPairDStream<String, Object> counted = pairStream.countByKey();
+ JavaPairDStream<String, Long> counted = pairStream.countByKey();
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
@@ -589,8 +589,8 @@ public class JavaAPISuite implements Serializable {
sc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- // TODO: Below fails with compile error with <String, Long>... wtf?
- JavaPairDStream<String, Object> counted = pairStream.countByKeyAndWindow(new Time(2000), new Time(1000));
+ JavaPairDStream<String, Long> counted =
+ pairStream.countByKeyAndWindow(new Time(2000), new Time(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3);