From 2a872335c5c7b5481c927272447e4a344ef59dda Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 16 Jan 2013 13:17:45 -0800 Subject: Bug fix and test cleanup --- .../spark/streaming/api/java/JavaPairDStream.scala | 4 ++-- .../src/test/scala/spark/streaming/JavaAPISuite.java | 17 ++++++++++------- 2 files changed, 12 insertions(+), 9 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index 49a0f27b5b..1c5b864ff0 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -433,7 +433,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val cm: ClassManifest[S] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] - def scalaFunc(values: Seq[V], state: Option[S]): Option[S] = { + val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => { val list: JList[V] = values val scalaState: Optional[S] = state match { case Some(s) => Optional.of(s) @@ -445,7 +445,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( case _ => None } } - dstream.updateStateByKey(scalaFunc _) + dstream.updateStateByKey(scalaFunc) } def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index d95ab485f8..549fb5b733 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -52,15 +52,15 @@ public class JavaAPISuite implements Serializable { Arrays.asList(3,4,5), Arrays.asList(3)); - List> expected = Arrays.asList( - Arrays.asList(4), - Arrays.asList(3), - Arrays.asList(1)); + List> expected = Arrays.asList( + Arrays.asList(4L), + Arrays.asList(3L), + Arrays.asList(1L)); JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); JavaDStream count = stream.count(); JavaTestUtils.attachTestOutputStream(count); - List> result = JavaTestUtils.runStreams(sc, 3, 3); + List> result = JavaTestUtils.runStreams(sc, 3, 3); assertOrderInvariantEquals(expected, result); } @@ -561,8 +561,8 @@ public class JavaAPISuite implements Serializable { new Tuple2("new york", 5)), Arrays.asList(new Tuple2("california", 14), new Tuple2("new york", 9)), - Arrays.asList(new Tuple2("california", 10), - new Tuple2("new york", 4))); + Arrays.asList(new Tuple2("california", 14), + new Tuple2("new york", 9))); JavaDStream> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); @@ -572,6 +572,9 @@ public class JavaAPISuite implements Serializable { @Override public Optional call(List values, Optional state) { int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } for (Integer v: values) { out = out + v; } -- cgit v1.2.3