aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
committerSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
commit649e9d0f5b2d5fc13f2dd5be675331510525927f (patch)
treecc500b373fda20ef42243c199ecfb6f381310abb /streaming
parent5936bf9fa85ccf7f0216145356140161c2801682 (diff)
downloadspark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.gz
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.bz2
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.zip
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable. CC rxin pwendell for API change; tdas since it also touches streaming. Author: Sean Owen <sowen@cloudera.com> Closes #10413 from srowen/SPARK-3369.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala2
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java20
4 files changed, 16 insertions, 18 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a791a474c6..f10de485d0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -166,8 +166,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* and then flattening the results
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
- import scala.collection.JavaConverters._
- def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -176,8 +175,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* and then flattening the results
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- import scala.collection.JavaConverters._
- def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = fakeClassTag
new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -189,7 +187,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -202,7 +200,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1dfb4e7abc..db79eeab9c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -550,7 +550,7 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
+ def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index 96a444a7ba..d60a617978 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time}
private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
- flatMapFunc: T => Traversable[U]
+ flatMapFunc: T => TraversableOnce[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 4dbcef2934..806cea24ca 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -271,12 +271,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> mapped = stream.mapPartitions(
new FlatMapFunction<Iterator<String>, String>() {
@Override
- public Iterable<String> call(Iterator<String> in) {
+ public Iterator<String> call(Iterator<String> in) {
StringBuilder out = new StringBuilder();
while (in.hasNext()) {
out.append(in.next().toUpperCase(Locale.ENGLISH));
}
- return Arrays.asList(out.toString());
+ return Arrays.asList(out.toString()).iterator();
}
});
JavaTestUtils.attachTestOutputStream(mapped);
@@ -759,8 +759,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Arrays.asList(x.split("(?!^)"));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split("(?!^)")).iterator();
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -846,12 +846,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
new PairFlatMapFunction<String, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(String in) {
+ public Iterator<Tuple2<Integer, String>> call(String in) {
List<Tuple2<Integer, String>> out = new ArrayList<>();
for (String letter: in.split("(?!^)")) {
out.add(new Tuple2<>(in.length(), letter));
}
- return out;
+ return out.iterator();
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -1019,13 +1019,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
+ public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
List<Tuple2<Integer, String>> out = new LinkedList<>();
while (in.hasNext()) {
Tuple2<String, Integer> next = in.next();
out.add(next.swap());
}
- return out;
+ return out.iterator();
}
});
@@ -1089,12 +1089,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
+ public Iterator<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
List<Tuple2<Integer, String>> out = new LinkedList<>();
for (Character s : in._1().toCharArray()) {
out.add(new Tuple2<>(in._2(), s.toString()));
}
- return out;
+ return out.iterator();
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);