aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
committerSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
commit649e9d0f5b2d5fc13f2dd5be675331510525927f (patch)
treecc500b373fda20ef42243c199ecfb6f381310abb /streaming/src/main
parent5936bf9fa85ccf7f0216145356140161c2801682 (diff)
downloadspark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.gz
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.bz2
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.zip
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable. CC rxin pwendell for API change; tdas since it also touches streaming. Author: Sean Owen <sowen@cloudera.com> Closes #10413 from srowen/SPARK-3369.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala2
3 files changed, 6 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a791a474c6..f10de485d0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -166,8 +166,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* and then flattening the results
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
- import scala.collection.JavaConverters._
- def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -176,8 +175,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* and then flattening the results
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- import scala.collection.JavaConverters._
- def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = fakeClassTag
new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -189,7 +187,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -202,7 +200,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1dfb4e7abc..db79eeab9c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -550,7 +550,7 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
+ def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index 96a444a7ba..d60a617978 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time}
private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
- flatMapFunc: T => Traversable[U]
+ flatMapFunc: T => TraversableOnce[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)