aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-16 13:17:45 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-17 18:41:58 -0800
commit2a872335c5c7b5481c927272447e4a344ef59dda (patch)
treee1819828fb76169e488ee68b8f6daf7c670499a9 /streaming/src
parenta0013beb039c8569e9e69f96fce0c341d1a1d180 (diff)
downloadspark-2a872335c5c7b5481c927272447e4a344ef59dda.tar.gz
spark-2a872335c5c7b5481c927272447e4a344ef59dda.tar.bz2
spark-2a872335c5c7b5481c927272447e4a344ef59dda.zip
Bug fix and test cleanup
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala4
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java17
2 files changed, 12 insertions, 9 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index 49a0f27b5b..1c5b864ff0 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -433,7 +433,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val cm: ClassManifest[S] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
- def scalaFunc(values: Seq[V], state: Option[S]): Option[S] = {
+ val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => {
val list: JList[V] = values
val scalaState: Optional[S] = state match {
case Some(s) => Optional.of(s)
@@ -445,7 +445,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
case _ => None
}
}
- dstream.updateStateByKey(scalaFunc _)
+ dstream.updateStateByKey(scalaFunc)
}
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index d95ab485f8..549fb5b733 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -52,15 +52,15 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(3,4,5),
Arrays.asList(3));
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(4),
- Arrays.asList(3),
- Arrays.asList(1));
+ List<List<Long>> expected = Arrays.asList(
+ Arrays.asList(4L),
+ Arrays.asList(3L),
+ Arrays.asList(1L));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream count = stream.count();
JavaTestUtils.attachTestOutputStream(count);
- List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Long>> result = JavaTestUtils.runStreams(sc, 3, 3);
assertOrderInvariantEquals(expected, result);
}
@@ -561,8 +561,8 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("new york", 5)),
Arrays.asList(new Tuple2<String, Integer>("california", 14),
new Tuple2<String, Integer>("new york", 9)),
- Arrays.asList(new Tuple2<String, Integer>("california", 10),
- new Tuple2<String, Integer>("new york", 4)));
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -572,6 +572,9 @@ public class JavaAPISuite implements Serializable {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
for (Integer v: values) {
out = out + v;
}