diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-16 13:17:45 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-17 18:41:58 -0800 |
commit | 2a872335c5c7b5481c927272447e4a344ef59dda (patch) | |
tree | e1819828fb76169e488ee68b8f6daf7c670499a9 /streaming/src | |
parent | a0013beb039c8569e9e69f96fce0c341d1a1d180 (diff) | |
download | spark-2a872335c5c7b5481c927272447e4a344ef59dda.tar.gz spark-2a872335c5c7b5481c927272447e4a344ef59dda.tar.bz2 spark-2a872335c5c7b5481c927272447e4a344ef59dda.zip |
Bug fix and test cleanup
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala | 4 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/JavaAPISuite.java | 17 |
2 files changed, 12 insertions, 9 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index 49a0f27b5b..1c5b864ff0 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -433,7 +433,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val cm: ClassManifest[S] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] - def scalaFunc(values: Seq[V], state: Option[S]): Option[S] = { + val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => { val list: JList[V] = values val scalaState: Optional[S] = state match { case Some(s) => Optional.of(s) @@ -445,7 +445,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( case _ => None } } - dstream.updateStateByKey(scalaFunc _) + dstream.updateStateByKey(scalaFunc) } def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index d95ab485f8..549fb5b733 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -52,15 +52,15 @@ public class JavaAPISuite implements Serializable { Arrays.asList(3,4,5), Arrays.asList(3)); - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(4), - Arrays.asList(3), - Arrays.asList(1)); + List<List<Long>> expected = Arrays.asList( + Arrays.asList(4L), + Arrays.asList(3L), + Arrays.asList(1L)); JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); JavaDStream count = stream.count(); JavaTestUtils.attachTestOutputStream(count); - List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3); + List<List<Long>> result = JavaTestUtils.runStreams(sc, 3, 3); assertOrderInvariantEquals(expected, result); } @@ -561,8 +561,8 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, Integer>("new york", 5)), Arrays.asList(new Tuple2<String, Integer>("california", 14), new Tuple2<String, Integer>("new york", 9)), - Arrays.asList(new Tuple2<String, Integer>("california", 10), - new Tuple2<String, Integer>("new york", 4))); + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9))); JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -572,6 +572,9 @@ public class JavaAPISuite implements Serializable { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } for (Integer v: values) { out = out + v; } |