From c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sat, 12 Oct 2013 15:02:57 +0800 Subject: Upgrade Kafka 0.7.2 to Kafka 0.8.0-beta1 for Spark Streaming --- .../java/org/apache/spark/streaming/JavaAPISuite.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index c0d729ff87..dc01f1e8aa 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1220,14 +1220,20 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap topics = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK()); HashMap kafkaParams = Maps.newHashMap(); - kafkaParams.put("zk.connect","localhost:12345"); - kafkaParams.put("groupid","consumer-group"); - JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, + kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream test3 = ssc.kafkaStream( + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topics, StorageLevel.MEMORY_AND_DISK()); } -- cgit v1.2.3 From 06664987990debcb4439a9dc26e1859508c601f5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 21 Oct 2013 05:34:09 -0700 Subject: Updated TransformDStream to allow n-ary DStream transform. Added transformWith, leftOuterJoin and rightOuterJoin operations to DStream for Scala and Java APIs. Also added n-ary union and n-ary transform operations to StreamingContext for Scala and Java APIs. --- .../apache/spark/api/java/function/Function3.java | 38 ++++++++ .../spark/api/java/function/WrappedFunction3.scala | 34 +++++++ .../scala/org/apache/spark/streaming/DStream.scala | 38 +++++++- .../spark/streaming/PairDStreamFunctions.scala | 66 +++++++++++-- .../apache/spark/streaming/StreamingContext.scala | 13 ++- .../spark/streaming/api/java/JavaDStreamLike.scala | 79 +++++++++++++++- .../spark/streaming/api/java/JavaPairDStream.scala | 57 ++++++++++-- .../streaming/api/java/JavaStreamingContext.scala | 25 ++++- .../streaming/dstream/TransformedDStream.scala | 20 ++-- .../org/apache/spark/streaming/JavaAPISuite.java | 89 +++++++++++++++++- .../spark/streaming/BasicOperationsSuite.scala | 103 ++++++++++++++++++++- 11 files changed, 529 insertions(+), 33 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/api/java/function/Function3.java create mode 100644 core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala (limited to 'streaming/src/test/java') diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java new file mode 100644 index 0000000000..530ee2ea79 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import scala.reflect.ClassManifest; +import scala.reflect.ClassManifest$; +import scala.runtime.AbstractFunction2; + +import java.io.Serializable; + +/** + * A two-argument function that takes arguments of type T1 and T2 and returns an R. + */ +public abstract class Function3 extends WrappedFunction3 + implements Serializable { + + public abstract R call(T1 t1, T2 t2, T3 t3) throws Exception; + + public ClassManifest returnType() { + return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); + } +} + diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala new file mode 100644 index 0000000000..8e8bbeb998 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function + +import scala.runtime.AbstractFunction3 + +/** + * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the + * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply + * isn't marked to allow that). + */ +private[spark] abstract class WrappedFunction3[T1, T2, T3, R] + extends AbstractFunction3[T1, T2, T3, R] { + @throws(classOf[Exception]) + def call(t1: T1, t2: T2, t3: T3): R + + final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3) +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 80da6bd30b..ee351daa60 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -500,7 +500,7 @@ abstract class DStream[T: ClassManifest] ( * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => transformFunc(r)) + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) } /** @@ -508,7 +508,41 @@ abstract class DStream[T: ClassManifest] ( * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - new TransformedDStream(this, context.sparkContext.clean(transformFunc)) + //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) + val cleanedF = context.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 1) + cleanedF(rdds.head.asInstanceOf[RDD[T]], time) + } + new TransformedDStream[U](Seq(this), realTransformFunc) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function on RDDs + * of DStreams stream1 and stream2. + */ + def transformWith[U: ClassManifest, V: ClassManifest]( + other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function on RDDs + * of DStreams stream1 and stream2. + */ + def transformWith[U: ClassManifest, V: ClassManifest]( + other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 2) + val rdd1 = rdds(0).asInstanceOf[RDD[T]] + val rdd2 = rdds(1).asInstanceOf[RDD[U]] + cleanedF(rdd1, rdd2, time) + } + new TransformedDStream[V](Seq(this, other), realTransformFunc) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 757bc98981..c319433e54 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -444,27 +444,73 @@ extends Serializable { } /** - * Join `this` DStream with `other` DStream. HashPartitioner is used - * to partition each generated RDD into default number of partitions. + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner()) } /** - * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will - * be generated by joining RDDs from `this` and other DStream. Uses the given - * Partitioner to partition each generated RDD. + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def join[W: ClassManifest]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (V, W))] = { - this.cogroup(other, partitioner) - .flatMapValues{ - case (vs, ws) => - for (v <- vs.iterator; w <- ws.iterator) yield (v, w) - } + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) + ) + } + + /** + * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def leftOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, Option[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) + ) + } + + /** + * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def rightOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Option[V], W))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) + ) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 098081d245..3217ef4581 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -455,12 +455,23 @@ class StreamingContext private ( } /** - * Create a unified DStream from multiple DStreams of the same type and same interval + * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { new UnionDStream[T](streams.toArray) } + /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. + */ + def transform[T: ClassManifest]( + streams: Seq[DStream[_]], + transformFunc: (Seq[RDD[_]], Time) => RDD[T] + ): DStream[T] = { + new TransformedDStream[T](streams, sparkContext.clean(transformFunc)) + } + /** * Register an input stream that will be started (InputDStream.start() called) to get the * input data. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 459695b7ca..1110d770c4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -24,7 +24,8 @@ import scala.collection.JavaConversions._ import org.apache.spark.streaming._ import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD} -import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.api.java.function.{Function3 => JFunction3, _} import java.util import org.apache.spark.rdd.RDD import JavaDStream._ @@ -306,6 +307,82 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T dstream.transform(scalaTransform(_, _)) } + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this and other DStreams. + */ + def transformWith[U, W]( + other: JavaDStream[U], + transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]] + ): JavaDStream[W] = { + implicit val cmu: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cmv: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this and other DStreams. + */ + def transformWith[U, K2, V2]( + other: JavaDStream[U], + transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]] + ): JavaPairDStream[K2, V2] = { + implicit val cmu: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cmk2: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv2: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this and other DStreams. + */ + def transformWith[K, V, W]( + other: JavaPairDStream[K, V], + transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaRDD[W]] + ): JavaDStream[W] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmw: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[W] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[(K, V), W](other.dstream, scalaTransform(_, _, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this and other DStreams. + */ + def transformWith[K, V, K2, V2]( + other: JavaPairDStream[K, V], + transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]] + ): JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk2: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv2: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd + dstream.transformWith[(K, V), (K2, V2)](other.dstream, scalaTransform(_, _, _)) + } + /** * Enable periodic checkpointing of RDDs of this DStream * @param interval Time interval after which generated RDD will be checkpointed diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 978fca33ad..821db46fff 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -36,7 +36,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( - implicit val kManifiest: ClassManifest[K], + implicit val kManifest: ClassManifest[K], implicit val vManifest: ClassManifest[V]) extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { @@ -499,8 +499,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Join `this` DStream with `other` DStream. HashPartitioner is used - * to partition each generated RDD into default number of partitions. + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { implicit val cm: ClassManifest[W] = @@ -509,9 +509,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will - * be generated by joining RDDs from `this` and other DStream. Uses the given - * Partitioner to partition each generated RDD. + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) : JavaPairDStream[K, (V, W)] = { @@ -520,6 +519,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.join(other.dstream, partitioner) } + /** + * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream.. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + */ + def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.leftOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + */ + def leftOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (V, Optional[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream.. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + */ + def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.rightOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** + * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + */ + def rightOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (Optional[V], W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 54ba3e6025..405f715d50 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.api.java import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream -import java.util.{Map => JMap} +import java.util.{Map => JMap, List => JList} import scala.collection.JavaConversions._ @@ -584,6 +584,29 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd) } + /** + * Create a unified DStream from multiple DStreams of the same type and same slide duration. + */ + def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = { + val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) + implicit val cm: ClassManifest[T] = first.classManifest + ssc.union(dstreams)(cm) + } + + /** + * Create a unified DStream from multiple DStreams of the same type and same slide duration. + */ + def union[K, V]( + first: JavaPairDStream[K, V], + rest: JList[JavaPairDStream[K, V]] + ): JavaPairDStream[K, V] = { + val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) + implicit val cm: ClassManifest[(K, V)] = first.classManifest + implicit val kcm: ClassManifest[K] = first.kManifest + implicit val vcm: ClassManifest[V] = first.vManifest + new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm) + } + /** * Sets the context to periodically checkpoint the DStream operations for master * fault-tolerance. The graph will be checkpointed every batch interval. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 60485adef9..71bcb2b390 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -21,16 +21,22 @@ import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, DStream, Time} private[streaming] -class TransformedDStream[T: ClassManifest, U: ClassManifest] ( - parent: DStream[T], - transformFunc: (RDD[T], Time) => RDD[U] - ) extends DStream[U](parent.ssc) { +class TransformedDStream[U: ClassManifest] ( + parents: Seq[DStream[_]], + transformFunc: (Seq[RDD[_]], Time) => RDD[U] + ) extends DStream[U](parents.head.ssc) { - override def dependencies = List(parent) + require(parents.length > 0, "List of DStreams to transform is empty") + require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts") + require(parents.map(_.slideDuration).distinct.size == 1, + "Some of the DStreams have different slide durations") - override def slideDuration: Duration = parent.slideDuration + override def dependencies = parents.toList + + override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(transformFunc(_, validTime)) + val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq + Some(transformFunc(parentRDDs, validTime)) } } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index c0d729ff87..9f885f07f2 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -223,7 +223,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(mapped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -338,6 +338,58 @@ public class JavaAPISuite implements Serializable { } @Test + public void testTransformWith() { + List>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList(new Tuple2("california", "sharks"), + new Tuple2("new york", "rangers"))); + + List>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2("california", "giants"), + new Tuple2("new york", "mets")), + Arrays.asList(new Tuple2("california", "ducks"), + new Tuple2("new york", "islanders"))); + + + List>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2>("california", + new Tuple2("dodgers", "giants")), + new Tuple2>("new york", + new Tuple2("yankees", "mets"))), + Arrays.asList( + new Tuple2>("california", + new Tuple2("sharks", "ducks")), + new Tuple2>("new york", + new Tuple2("rangers", "islanders")))); + + JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream> joined = pairStream1.transformWith( + pairStream2, + new Function3, JavaPairRDD, Time, JavaPairRDD>>() { + @Override + public JavaPairRDD> call(JavaPairRDD stringStringJavaPairRDD, JavaPairRDD stringStringJavaPairRDD2, Time time) throws Exception { + return stringStringJavaPairRDD.join(stringStringJavaPairRDD2); + } + } + ); + + JavaTestUtils.attachTestOutputStream(joined); + List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + + } + + @Test public void testFlatMap() { List> inputData = Arrays.asList( Arrays.asList("go", "giants"), @@ -1099,7 +1151,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream, List>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List, List>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -1142,7 +1194,38 @@ public class JavaAPISuite implements Serializable { JavaPairDStream> joined = pairStream1.join(pairStream2); JavaTestUtils.attachTestOutputStream(joined); - List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testLeftOuterJoin() { + List>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList(new Tuple2("california", "sharks") )); + + List>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2("california", "giants") ), + Arrays.asList(new Tuple2("new york", "islanders") ) + + ); + + List> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L)); + + JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream>> joined = pairStream1.leftOuterJoin(pairStream2); + JavaDStream counted = joined.count(); + JavaTestUtils.attachTestOutputStream(counted); + List> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 11586f72b6..a2ac510a98 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -18,7 +18,10 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import scala.runtime.RichInt + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { @@ -143,6 +146,72 @@ class BasicOperationsSuite extends TestSuiteBase { ) } + test("union") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 8, 101 to 108, 201 to 208) + testOperation( + input, + (s: DStream[Int]) => s.union(s.map(_ + 4)) , + output + ) + } + + test("StreamingContext.union") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 12, 101 to 112, 201 to 212) + // union over 3 DStreams + testOperation( + input, + (s: DStream[Int]) => s.context.union(Seq(s, s.map(_ + 4), s.map(_ + 8))), + output + ) + } + + test("transform") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.transform(rdd => rdd.map(_.toString)), // RDD.map in transform + input.map(_.map(_.toString)) + ) + } + + test("transformWith") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, "x")), ("b", (1, "x")) ), + Seq( ("", (1, "x")) ), + Seq( ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + val t1 = s1.map(x => (x, 1)) + val t2 = s2.map(x => (x, "x")) + t1.transformWith( // RDD.join in transform + t2, + (rdd1: RDD[(String, Int)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2) + ) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("StreamingContext.transform") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 12, 101 to 112, 201 to 212) + + // transform over 3 DStreams by doing union of the 3 RDDs + val operation = (s: DStream[Int]) => { + s.context.transform( + Seq(s, s.map(_ + 4), s.map(_ + 8)), // 3 DStreams + (rdds: Seq[RDD[_]], time: Time) => + rdds.head.context.union(rdds.map(_.asInstanceOf[RDD[Int]])) // union of RDDs + ) + } + + testOperation(input, operation, output) + } + test("cogroup") { val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() ) val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() ) @@ -168,7 +237,37 @@ class BasicOperationsSuite extends TestSuiteBase { Seq( ) ) val operation = (s1: DStream[String], s2: DStream[String]) => { - s1.map(x => (x,1)).join(s2.map(x => (x,"x"))) + s1.map(x => (x, 1)).join(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("leftOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, Some("x"))), ("b", (1, Some("x"))) ), + Seq( ("", (1, Some("x"))), ("a", (1, None)) ), + Seq( ("", (1, None)) ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).leftOuterJoin(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("rightOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (Some(1), "x")), ("b", (Some(1), "x")) ), + Seq( ("", (Some(1), "x")), ("b", (None, "x")) ), + Seq( ), + Seq( ("", (None, "x")) ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).rightOuterJoin(s2.map(x => (x, "x"))) } testOperation(inputData1, inputData2, operation, outputData, true) } -- cgit v1.2.3 From 72d2e1dd777696640f64aaf92fecab64c6387df0 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 22 Oct 2013 23:35:51 -0700 Subject: Fixed bug in Java transformWith, added more Java testcases for transform and transformWith, added missing variations of Java join and cogroup, updated various Scala and Java API docs. --- .../apache/spark/api/java/function/Function3.java | 2 +- .../spark/api/java/function/WrappedFunction3.scala | 4 +- .../scala/org/apache/spark/streaming/DStream.scala | 16 +- .../spark/streaming/PairDStreamFunctions.scala | 98 ++++++--- .../spark/streaming/api/java/JavaDStreamLike.scala | 64 +++--- .../spark/streaming/api/java/JavaPairDStream.scala | 142 +++++++++---- .../spark/streaming/dstream/CoGroupedDStream.scala | 58 ------ .../org/apache/spark/streaming/JavaAPISuite.java | 219 +++++++++++++++++++-- 8 files changed, 424 insertions(+), 179 deletions(-) delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala (limited to 'streaming/src/test/java') diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java index 530ee2ea79..2ce714cd0b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java @@ -24,7 +24,7 @@ import scala.runtime.AbstractFunction2; import java.io.Serializable; /** - * A two-argument function that takes arguments of type T1 and T2 and returns an R. + * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R. */ public abstract class Function3 extends WrappedFunction3 implements Serializable { diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala index 8e8bbeb998..d314dbdf1d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala +++ b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala @@ -20,8 +20,8 @@ package org.apache.spark.api.java.function import scala.runtime.AbstractFunction3 /** - * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the - * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply + * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the + * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply * isn't marked to allow that). */ private[spark] abstract class WrappedFunction3[T1, T2, T3, R] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index ee351daa60..38e34795b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -479,7 +479,7 @@ abstract class DStream[T: ClassManifest] ( /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: RDD[T] => Unit) { this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) @@ -487,7 +487,7 @@ abstract class DStream[T: ClassManifest] ( /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: (RDD[T], Time) => Unit) { val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) @@ -497,7 +497,7 @@ abstract class DStream[T: ClassManifest] ( /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) @@ -505,7 +505,7 @@ abstract class DStream[T: ClassManifest] ( /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) @@ -518,8 +518,8 @@ abstract class DStream[T: ClassManifest] ( } /** - * Return a new DStream in which each RDD is generated by applying a function on RDDs - * of DStreams stream1 and stream2. + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. */ def transformWith[U: ClassManifest, V: ClassManifest]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] @@ -529,8 +529,8 @@ abstract class DStream[T: ClassManifest] ( } /** - * Return a new DStream in which each RDD is generated by applying a function on RDDs - * of DStreams stream1 and stream2. + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. */ def transformWith[U: ClassManifest, V: ClassManifest]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index c319433e54..8c12fd11ef 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream} -import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream} +import org.apache.spark.streaming.dstream.{ShuffledDStream} import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream} import org.apache.spark.{Partitioner, HashPartitioner} @@ -359,7 +359,7 @@ extends Serializable { } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then @@ -398,11 +398,18 @@ extends Serializable { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) } - + /** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = { new MapValuedDStream[K, V, U](self, mapValuesFunc) } + /** + * Return a new DStream by applying a flatmap function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def flatMapValues[U: ClassManifest]( flatMapValuesFunc: V => TraversableOnce[U] ): DStream[(K, U)] = { @@ -410,9 +417,8 @@ extends Serializable { } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { @@ -420,31 +426,29 @@ extends Serializable { } /** - * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. Partitioner is used to partition each generated RDD. + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. */ def cogroup[W: ClassManifest]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (Seq[V], Seq[W]))] = { - - val cgd = new CoGroupedDStream[K]( - Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]), - partitioner - ) - val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)( - classManifest[K], - Manifests.seqSeqManifest + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) ) - pdfs.mapValues { - case Seq(vs, ws) => - (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) - } } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { @@ -452,7 +456,15 @@ extends Serializable { } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { + join[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def join[W: ClassManifest]( @@ -466,7 +478,7 @@ extends Serializable { } /** - * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ @@ -475,7 +487,19 @@ extends Serializable { } /** - * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def leftOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control * the partitioning of each RDD. */ @@ -490,7 +514,7 @@ extends Serializable { } /** - * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ @@ -499,7 +523,19 @@ extends Serializable { } /** - * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def rightOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control * the partitioning of each RDD. */ @@ -514,8 +550,8 @@ extends Serializable { } /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, @@ -525,8 +561,8 @@ extends Serializable { } /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles( prefix: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 1110d770c4..09189eadd8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -121,10 +121,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * this DStream. Applying glom() to an RDD coalesces all elements within each partition into * an array. */ - def glom(): JavaDStream[JList[T]] = + def glom(): JavaDStream[JList[T]] = { new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + } + - /** Return the StreamingContext associated with this DStream */ + /** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */ def context(): StreamingContext = dstream.context() /** Return a new DStream by applying a function to all elements of this DStream. */ @@ -239,7 +241,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: JFunction[R, Void]) { dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd))) @@ -247,7 +249,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: JFunction2[R, Time, Void]) { dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) @@ -255,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = @@ -267,7 +269,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = @@ -279,7 +281,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { @@ -294,7 +296,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { @@ -309,7 +311,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this and other DStreams. + * on each RDD of 'this' DStream and 'other' DStream. */ def transformWith[U, W]( other: JavaDStream[U], @@ -326,7 +328,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this and other DStreams. + * on each RDD of 'this' DStream and 'other' DStream. */ def transformWith[U, K2, V2]( other: JavaDStream[U], @@ -345,42 +347,42 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this and other DStreams. + * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[K, V, W]( - other: JavaPairDStream[K, V], - transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaRDD[W]] + def transformWith[K2, V2, W]( + other: JavaPairDStream[K2, V2], + transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]] ): JavaDStream[W] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk2: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv2: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] implicit val cmw: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] - def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[W] = + def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd - dstream.transformWith[(K, V), W](other.dstream, scalaTransform(_, _, _)) + dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _)) } /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this and other DStreams. + * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[K, V, K2, V2]( - other: JavaPairDStream[K, V], - transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]] - ): JavaPairDStream[K2, V2] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + def transformWith[K2, V2, K3, V3]( + other: JavaPairDStream[K2, V2], + transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]] + ): JavaPairDStream[K3, V3] = { implicit val cmk2: ClassManifest[K2] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] implicit val cmv2: ClassManifest[V2] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] - def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[(K2, V2)] = + implicit val cmk3: ClassManifest[K3] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]] + implicit val cmv3: ClassManifest[V3] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]] + def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd - dstream.transformWith[(K, V), (K2, V2)](other.dstream, scalaTransform(_, _, _)) + dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _)) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 821db46fff..309c0fa24b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3} import org.apache.spark.Partitioner import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} @@ -148,7 +148,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[PairRDDFunctions]] for more + * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more * information. */ def combineByKey[C](createCombiner: JFunction[V, C], @@ -413,7 +413,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then @@ -428,7 +428,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param updateFunc State update function. If `this` function returns None, then @@ -436,15 +436,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param numPartitions Number of partitions of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S]( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], numPartitions: Int) : JavaPairDStream[K, S] = { + implicit val cm: ClassManifest[S] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then @@ -452,19 +454,30 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S]( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], partitioner: Partitioner ): JavaPairDStream[K, S] = { + implicit val cm: ClassManifest[S] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) } + + /** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] dstream.mapValues(f) } + /** + * Return a new DStream by applying a flatmap function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = { import scala.collection.JavaConverters._ def fn = (x: V) => f.apply(x).asScala @@ -474,9 +487,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { @@ -486,20 +498,35 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. Partitioner is used to partition each generated RDD. + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (JList[V], JList[W])] = { + def cogroup[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream, numPartitions) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def cogroup[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (JList[V], JList[W])] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] dstream.cogroup(other.dstream, partitioner) - .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { @@ -509,19 +536,32 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream, numPartitions) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ - def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (V, W)] = { + def join[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (V, W)] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] dstream.join(other.dstream, partitioner) } /** - * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream.. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. */ def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { implicit val cm: ClassManifest[W] = @@ -531,11 +571,28 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def leftOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (V, Optional[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ - def leftOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (V, Optional[W])] = { + def leftOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (V, Optional[W])] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) @@ -543,8 +600,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream.. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. */ def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { implicit val cm: ClassManifest[W] = @@ -554,11 +612,29 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def rightOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (Optional[V], W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. */ - def rightOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (Optional[V], W)] = { + def rightOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (Optional[V], W)] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) @@ -640,9 +716,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } object JavaPairDStream { - implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) - :JavaPairDStream[K, V] = + implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) = { new JavaPairDStream[K, V](dstream) + } def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { implicit val cmk: ClassManifest[K] = diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala deleted file mode 100644 index 4eddc755b9..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.CoGroupedRDD -import org.apache.spark.streaming.{Time, DStream, Duration} - -private[streaming] -class CoGroupedDStream[K : ClassManifest]( - parents: Seq[DStream[(K, _)]], - partitioner: Partitioner - ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideDuration).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideDuration: Duration = parents.head.slideDuration - - override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { - val part = partitioner - val rdds = parents.flatMap(_.getOrCompute(validTime)) - if (rdds.size > 0) { - val q = new CoGroupedRDD[K](rdds, part) - Some(q) - } else { - None - } - } - -} diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 9f885f07f2..16622a3459 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -320,23 +320,104 @@ public class JavaAPISuite implements Serializable { Arrays.asList(9,10,11)); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream transformed = - stream.transform(new Function, JavaRDD>() { - @Override - public JavaRDD call(JavaRDD in) throws Exception { - return in.map(new Function() { - @Override - public Integer call(Integer i) throws Exception { - return i + 2; - } - }); - }}); + JavaDStream transformed = stream.transform( + new Function, JavaRDD>() { + @Override + public JavaRDD call(JavaRDD in) throws Exception { + return in.map(new Function() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + } + } + ); + JavaTestUtils.attachTestOutputStream(transformed); List> result = JavaTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result); } + @Test + public void testVariousTransform() { + // tests whether all variations of transform can be called from Java + + List> inputData = Arrays.asList(Arrays.asList(1)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + + List>> pairInputData = + Arrays.asList(Arrays.asList(new Tuple2("x", 1))); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); + + JavaDStream transformed1 = stream.transform( + new Function, JavaRDD>() { + @Override public JavaRDD call(JavaRDD in) throws Exception { + return null; + } + } + ); + + JavaDStream transformed2 = stream.transform( + new Function2, Time, JavaRDD>() { + @Override public JavaRDD call(JavaRDD in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream transformed3 = stream.transform( + new Function, JavaPairRDD>() { + @Override public JavaPairRDD call(JavaRDD in) throws Exception { + return null; + } + } + ); + + JavaPairDStream transformed4 = stream.transform( + new Function2, Time, JavaPairRDD>() { + @Override public JavaPairRDD call(JavaRDD in, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream pairTransformed1 = pairStream.transform( + new Function, JavaRDD>() { + @Override public JavaRDD call(JavaPairRDD in) throws Exception { + return null; + } + } + ); + + JavaDStream pairTransformed2 = pairStream.transform( + new Function2, Time, JavaRDD>() { + @Override public JavaRDD call(JavaPairRDD in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream pairTransformed3 = pairStream.transform( + new Function, JavaPairRDD>() { + @Override public JavaPairRDD call(JavaPairRDD in) throws Exception { + return null; + } + } + ); + + JavaPairDStream pairTransformed4 = pairStream.transform( + new Function2, Time, JavaPairRDD>() { + @Override public JavaPairRDD call(JavaPairRDD in, Time time) throws Exception { + return null; + } + } + ); + + } + @Test public void testTransformWith() { List>> stringStringKVStream1 = Arrays.asList( @@ -374,10 +455,18 @@ public class JavaAPISuite implements Serializable { JavaPairDStream> joined = pairStream1.transformWith( pairStream2, - new Function3, JavaPairRDD, Time, JavaPairRDD>>() { - @Override - public JavaPairRDD> call(JavaPairRDD stringStringJavaPairRDD, JavaPairRDD stringStringJavaPairRDD2, Time time) throws Exception { - return stringStringJavaPairRDD.join(stringStringJavaPairRDD2); + new Function3 < + JavaPairRDD, + JavaPairRDD, + Time, + JavaPairRDD> + >() { + @Override public JavaPairRDD> call( + JavaPairRDD rdd1, + JavaPairRDD rdd2, + Time time + ) throws Exception { + return rdd1.join(rdd2); } } ); @@ -389,6 +478,106 @@ public class JavaAPISuite implements Serializable { } + @Test + public void testVariousTransformWith() { + // tests whether all variations of transformWith can be called from Java + + List> inputData1 = Arrays.asList(Arrays.asList(1)); + List> inputData2 = Arrays.asList(Arrays.asList("x")); + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); + + List>> pairInputData1 = + Arrays.asList(Arrays.asList(new Tuple2("x", 1))); + List>> pairInputData2 = + Arrays.asList(Arrays.asList(new Tuple2(1.0, 'x'))); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); + JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); + + JavaDStream transformed1 = stream1.transformWith( + stream2, + new Function3, JavaRDD, Time, JavaRDD>() { + @Override + public JavaRDD call(JavaRDD rdd1, JavaRDD rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream transformed2 = stream1.transformWith( + pairStream1, + new Function3, JavaPairRDD, Time, JavaRDD>() { + @Override + public JavaRDD call(JavaRDD rdd1, JavaPairRDD rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream transformed3 = stream1.transformWith( + stream2, + new Function3, JavaRDD, Time, JavaPairRDD>() { + @Override + public JavaPairRDD call(JavaRDD rdd1, JavaRDD rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream transformed4 = stream1.transformWith( + pairStream1, + new Function3, JavaPairRDD, Time, JavaPairRDD>() { + @Override + public JavaPairRDD call(JavaRDD rdd1, JavaPairRDD rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream pairTransformed1 = pairStream1.transformWith( + stream2, + new Function3, JavaRDD, Time, JavaRDD>() { + @Override + public JavaRDD call(JavaPairRDD rdd1, JavaRDD rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream pairTransformed2_ = pairStream1.transformWith( + pairStream1, + new Function3, JavaPairRDD, Time, JavaRDD>() { + @Override + public JavaRDD call(JavaPairRDD rdd1, JavaPairRDD rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream pairTransformed3 = pairStream1.transformWith( + stream2, + new Function3, JavaRDD, Time, JavaPairRDD>() { + @Override + public JavaPairRDD call(JavaPairRDD rdd1, JavaRDD rdd2, Time time) throws Exception { + return null; + } + } + ); + + + JavaPairDStream pairTransformed4 = pairStream1.transformWith( + pairStream2, + new Function3, JavaPairRDD, Time, JavaPairRDD>() { + @Override + public JavaPairRDD call(JavaPairRDD rdd1, JavaPairRDD rdd2, Time time) throws Exception { + return null; + } + } + ); + } + @Test public void testFlatMap() { List> inputData = Arrays.asList( -- cgit v1.2.3 From bacfe5ebca8e82317a7596c9fcbf95331c7038a9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 24 Oct 2013 10:56:24 -0700 Subject: Added JavaStreamingContext.transform --- .../org/apache/spark/api/java/JavaPairRDD.scala | 11 ++ .../apache/spark/streaming/StreamingContext.scala | 4 +- .../spark/streaming/api/java/JavaPairDStream.scala | 5 + .../streaming/api/java/JavaStreamingContext.scala | 50 +++++++- .../org/apache/spark/streaming/JavaAPISuite.java | 132 ++++++++++++++++----- 5 files changed, 169 insertions(+), 33 deletions(-) (limited to 'streaming/src/test/java') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index a6518abf45..c099ca77b9 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -598,4 +598,15 @@ object JavaPairRDD { new JavaPairRDD[K, V](rdd) implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd + + + /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */ + def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + new JavaPairRDD[K, V](rdd.rdd) + } + } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 3c466ade93..70bc25070a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -474,10 +474,10 @@ class StreamingContext private ( * the DStreams. */ def transform[T: ClassManifest]( - streams: Seq[DStream[_]], + dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[T] ): DStream[T] = { - new TransformedDStream[T](streams, sparkContext.clean(transformFunc)) + new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 309c0fa24b..4dd6b7d096 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -711,6 +711,11 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } + /** Convert to a JavaDStream */ + def toJavaDStream(): JavaDStream[(K, V)] = { + new JavaDStream[(K, V)](dstream) + } + override val classManifest: ClassManifest[(K, V)] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index a4b1670cd4..cf30b541e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -33,7 +33,7 @@ import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy} @@ -616,6 +616,54 @@ class JavaStreamingContext(val ssc: StreamingContext) { new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm) } + /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. The order of the JavaRDDs in the transform function parameter will be the + * same as the order of corresponding DStreams in the list. Note that for adding a + * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using + * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). + * In the transform function, convert the JavaRDD corresponding to that JavaDStream to + * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + */ + def transform[T]( + dstreams: JList[JavaDStream[_]], + transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]] + ): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val scalaDStreams = dstreams.map(_.dstream).toSeq + val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList + transformFunc.call(jrdds, time).rdd + } + ssc.transform(scalaDStreams, scalaTransformFunc) + } + + /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. The order of the JavaRDDs in the transform function parameter will be the + * same as the order of corresponding DStreams in the list. Note that for adding a + * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using + * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). + * In the transform function, convert the JavaRDD corresponding to that JavaDStream to + * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + */ + def transform[K, V]( + dstreams: JList[JavaDStream[_]], + transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]] + ): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val scalaDStreams = dstreams.map(_.dstream).toSeq + val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList + transformFunc.call(jrdds, time).rdd + } + ssc.transform(scalaDStreams, scalaTransformFunc) + } + /** * Sets the context to periodically checkpoint the DStream operations for master * fault-tolerance. The graph will be checkpointed every batch interval. diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 2f92421367..f588afe90c 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.io.Files; import kafka.serializer.StringDecoder; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -292,8 +293,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9)); JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); - JavaRDD rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); - JavaRDD rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); + JavaRDD rdd1 = ssc.sc().parallelize(Arrays.asList(1, 2, 3)); + JavaRDD rdd2 = ssc.sc().parallelize(Arrays.asList(4, 5, 6)); JavaRDD rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); LinkedList> rdds = Lists.newLinkedList(); @@ -331,7 +332,6 @@ public class JavaAPISuite implements Serializable { } }); } - } ); JavaTestUtils.attachTestOutputStream(transformed); @@ -354,7 +354,8 @@ public class JavaAPISuite implements Serializable { JavaDStream transformed1 = stream.transform( new Function, JavaRDD>() { - @Override public JavaRDD call(JavaRDD in) throws Exception { + @Override + public JavaRDD call(JavaRDD in) throws Exception { return null; } } @@ -421,51 +422,56 @@ public class JavaAPISuite implements Serializable { @Test public void testTransformWith() { List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers"), - new Tuple2("new york", "yankees")), - Arrays.asList(new Tuple2("california", "sharks"), - new Tuple2("new york", "rangers"))); + Arrays.asList( + new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList( + new Tuple2("california", "sharks"), + new Tuple2("new york", "rangers"))); List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2("california", "giants"), - new Tuple2("new york", "mets")), - Arrays.asList(new Tuple2("california", "ducks"), - new Tuple2("new york", "islanders"))); + Arrays.asList( + new Tuple2("california", "giants"), + new Tuple2("new york", "mets")), + Arrays.asList( + new Tuple2("california", "ducks"), + new Tuple2("new york", "islanders"))); List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2>("california", - new Tuple2("dodgers", "giants")), - new Tuple2>("new york", - new Tuple2("yankees", "mets"))), - Arrays.asList( - new Tuple2>("california", - new Tuple2("sharks", "ducks")), - new Tuple2>("new york", - new Tuple2("rangers", "islanders")))); + Arrays.asList( + new Tuple2>("california", + new Tuple2("dodgers", "giants")), + new Tuple2>("new york", + new Tuple2("yankees", "mets"))), + Arrays.asList( + new Tuple2>("california", + new Tuple2("sharks", "ducks")), + new Tuple2>("new york", + new Tuple2("rangers", "islanders")))); JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); + ssc, stringStringKVStream1, 1); JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); + ssc, stringStringKVStream2, 1); JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); JavaPairDStream> joined = pairStream1.transformWith( pairStream2, - new Function3 < + new Function3< JavaPairRDD, JavaPairRDD, Time, JavaPairRDD> >() { - @Override public JavaPairRDD> call( + @Override + public JavaPairRDD> call( JavaPairRDD rdd1, JavaPairRDD rdd2, Time time - ) throws Exception { + ) throws Exception { return rdd1.join(rdd2); } } @@ -475,9 +481,9 @@ public class JavaAPISuite implements Serializable { List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); - } + @Test public void testVariousTransformWith() { // tests whether all variations of transformWith can be called from Java @@ -566,7 +572,6 @@ public class JavaAPISuite implements Serializable { } ); - JavaPairDStream pairTransformed4 = pairStream1.transformWith( pairStream2, new Function3, JavaPairRDD, Time, JavaPairRDD>() { @@ -578,7 +583,74 @@ public class JavaAPISuite implements Serializable { ); } - @Test + @Test + public void testStreamingContextTransform(){ + List> stream1input = Arrays.asList( + Arrays.asList(1), + Arrays.asList(2) + ); + + List> stream2input = Arrays.asList( + Arrays.asList(3), + Arrays.asList(4) + ); + + List>> pairStream1input = Arrays.asList( + Arrays.asList(new Tuple2(1, "x")), + Arrays.asList(new Tuple2(2, "y")) + ); + + List>>> expected = Arrays.asList( + Arrays.asList(new Tuple2>(1, new Tuple2(1, "x"))), + Arrays.asList(new Tuple2>(2, new Tuple2(2, "y"))) + ); + + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); + + List> listOfDStreams1 = Arrays.>asList(stream1, stream2); + + // This is just to test whether this transform to JavaStream compiles + JavaDStream transformed1 = ssc.transform( + listOfDStreams1, + new Function2>, Time, JavaRDD>() { + public JavaRDD call(List> listOfRDDs, Time time) { + assert(listOfRDDs.size() == 2); + return null; + } + } + ); + + List> listOfDStreams2 = + Arrays.>asList(stream1, stream2, pairStream1.toJavaDStream()); + + JavaPairDStream> transformed2 = ssc.transform( + listOfDStreams2, + new Function2>, Time, JavaPairRDD>>() { + public JavaPairRDD> call(List> listOfRDDs, Time time) { + assert(listOfRDDs.size() == 3); + JavaRDD rdd1 = (JavaRDD)listOfRDDs.get(0); + JavaRDD rdd2 = (JavaRDD)listOfRDDs.get(1); + JavaRDD> rdd3 = (JavaRDD>)listOfRDDs.get(2); + JavaPairRDD prdd3 = JavaPairRDD.fromJavaRDD(rdd3); + PairFunction mapToTuple = new PairFunction() { + @Override + public Tuple2 call(Integer i) throws Exception { + return new Tuple2(i, i); + } + }; + return rdd1.union(rdd2).map(mapToTuple).join(prdd3); + } + } + ); + JavaTestUtils.attachTestOutputStream(transformed2); + List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + Assert.assertEquals(expected, result); + } + + @Test public void testFlatMap() { List> inputData = Arrays.asList( Arrays.asList("go", "giants"), -- cgit v1.2.3 From e962a6e6ee8d8ef9d1245d85616fe50554f7f689 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 24 Oct 2013 15:17:26 -0700 Subject: Fixed accidental bug. --- streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index f588afe90c..5d48908667 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -332,7 +332,7 @@ public class JavaAPISuite implements Serializable { } }); } - ); + }); JavaTestUtils.attachTestOutputStream(transformed); List> result = JavaTestUtils.runStreams(ssc, 3, 3); -- cgit v1.2.3 From 39f6f75588b69f07cd963c5e211045fed103695b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 16:43:33 -0700 Subject: Some clean-up of tests --- .../test/java/org/apache/spark/streaming/JavaTestUtils.scala | 3 +-- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 ++-- .../test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 10 +++++++--- 3 files changed, 10 insertions(+), 7 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 8a6604904d..5344ae7682 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -54,8 +54,7 @@ trait JavaTestBase extends TestSuiteBase { { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - val ostream = new TestOutputStream(dstream.dstream, - new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + val ostream = new TestOutputStreamWithPartitions(dstream.dstream) dstream.dstream.ssc.registerOutputStream(ostream) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index a327de80b3..beb20831bd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -366,7 +366,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { logInfo("Manual clock after advancing = " + clock.time) Thread.sleep(batchDuration.milliseconds) - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] - outputStream.output + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]] + outputStream.output.map(_.flatten) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 26f515a778..be140699c2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -63,7 +63,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ * * The buffer contains a sequence of RDD's, each containing a sequence of items */ -class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) +class TestOutputStream[T: ClassManifest](parent: DStream[T], + val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() output += collected @@ -82,9 +83,10 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. * * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each - * containing a sequnce of items. + * containing a sequence of items. */ -class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]]) +class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], + val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.glom().collect().map(_.toSeq) output += collected @@ -96,6 +98,8 @@ class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val o ois.defaultReadObject() output.clear() } + + def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten)) } /** -- cgit v1.2.3 From 31e92b72e31910be1694c348ab5de8b14f2df44b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 21:14:56 -0700 Subject: Adding Java versions and associated tests --- .../org/apache/spark/api/java/JavaDoubleRDD.scala | 11 ++++++++ .../org/apache/spark/api/java/JavaPairRDD.scala | 11 ++++++++ .../scala/org/apache/spark/api/java/JavaRDD.scala | 11 ++++++++ core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../test/scala/org/apache/spark/JavaAPISuite.java | 21 ++++++++++++++ .../spark/streaming/api/java/JavaDStream.scala | 6 ++++ .../spark/streaming/api/java/JavaPairDStream.scala | 6 ++++ .../org/apache/spark/streaming/JavaAPISuite.java | 33 ++++++++++++++++++++++ .../org/apache/spark/streaming/JavaTestUtils.scala | 23 +++++++++++++++ 9 files changed, 123 insertions(+), 1 deletion(-) (limited to 'streaming/src/test/java') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index f9b6ee351a..043cb183ba 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -93,6 +93,17 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions, shuffle)) + /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.repartition(numPartitions)) + /** * Return an RDD with the elements from `this` that are not in `other`. * diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 268f43b4e8..39f408b8c8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -107,6 +107,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions, shuffle)) + /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.repartition(numPartitions)) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 662990049b..3b359a8fd6 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -81,6 +81,17 @@ JavaRDDLike[T, JavaRDD[T]] { def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] = rdd.coalesce(numPartitions, shuffle) + /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 17bc2515f2..6e88be6f6a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -268,7 +268,7 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD that has exactly numPartitions partitions. * - * Used to increase or decrease the level of parallelism in this RDD. This will use + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 7b0bb89ab2..f38c607d65 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -472,6 +472,27 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); } + @Test + public void repartition() { + // Shrinking number of partitions + JavaRDD in1 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 2); + JavaRDD repartitioned1 = in1.repartition(4); + List> result1 = repartitioned1.glom().collect(); + Assert.assertEquals(4, result1.size()); + for (List l: result1) { + Assert.assertTrue(l.size() > 0); + } + + // Growing number of partitions + JavaRDD in2 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 4); + JavaRDD repartitioned2 = in2.repartition(2); + List> result2 = repartitioned2.glom().collect(); + Assert.assertEquals(2, result2.size()); + for (List l: result2) { + Assert.assertTrue(l.size() > 0); + } + } + @Test public void persist() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index d1932b6b05..1a2aeaa879 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -94,6 +94,12 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM */ def union(that: JavaDStream[T]): JavaDStream[T] = dstream.union(that.dstream) + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaDStream[T] = dstream.repartition(numPartitions) } object JavaDStream { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 978fca33ad..faf8f36182 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -59,6 +59,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaPairDStream[K, V] = dstream.repartition(numPartitions) + /** Method that generates a RDD for the given Duration */ def compute(validTime: Time): JavaPairRDD[K, V] = { dstream.compute(validTime) match { diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index dc01f1e8aa..5a9836a415 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -183,6 +183,39 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result); } + @Test + public void testRepartitionMorePartitions() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3,4,5,6,7,8,9,10), + Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); + JavaDStream repartitioned = stream.repartition(4); + JavaTestUtils.attachTestOutputStream(repartitioned); + List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for ( List> rdd : result) { + Assert.assertEquals(4, rdd.size()); + Assert.assertEquals( + 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size()); + } + } + + @Test + public void testRepartitionFewerPartitions() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3,4,5,6,7,8,9,10), + Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); + JavaDStream repartitioned = stream.repartition(2); + JavaTestUtils.attachTestOutputStream(repartitioned); + List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for ( List> rdd : result) { + Assert.assertEquals(2, rdd.size()); + Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size()); + } + } + @Test public void testGlom() { List> inputData = Arrays.asList( diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 5344ae7682..780f7b823b 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -62,6 +62,8 @@ trait JavaTestBase extends TestSuiteBase { * Process all registered streams for a numBatches batches, failing if * numExpectedOutput RDD's are not generated. Generated RDD's are collected * and returned, represented as a list for each batch interval. + * + * Returns a list of items for each RDD. */ def runStreams[V]( ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { @@ -72,6 +74,27 @@ trait JavaTestBase extends TestSuiteBase { res.map(entry => out.append(new ArrayList[V](entry))) out } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + * + * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each + * representing one partition. + */ + def runStreamsWithPartitions[V]( + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = { + implicit val cm: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[JList[V]]]() + res.map(entry => { + val lists = entry.map(new ArrayList[V](_)) + out.append(new ArrayList[JList[V]](lists)) + }) + out + } } object JavaTestUtils extends JavaTestBase { -- cgit v1.2.3 From a351fd4aeda7d137e4cece705e25b51d7634ca63 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 21:16:30 -0700 Subject: Small spacing fix --- streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 5a9836a415..391d7ba21d 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -193,7 +193,7 @@ public class JavaAPISuite implements Serializable { JavaTestUtils.attachTestOutputStream(repartitioned); List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); - for ( List> rdd : result) { + for (List> rdd : result) { Assert.assertEquals(4, rdd.size()); Assert.assertEquals( 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size()); @@ -210,7 +210,7 @@ public class JavaAPISuite implements Serializable { JavaTestUtils.attachTestOutputStream(repartitioned); List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); - for ( List> rdd : result) { + for (List> rdd : result) { Assert.assertEquals(2, rdd.size()); Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size()); } -- cgit v1.2.3 From e5f6d5697b43ac89a50fb791f4b284409e75b1f4 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 22:08:06 -0700 Subject: Spacing fix --- core/src/test/scala/org/apache/spark/JavaAPISuite.java | 4 ++-- .../src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'streaming/src/test/java') diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index f38c607d65..352036f182 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -475,7 +475,7 @@ public class JavaAPISuite implements Serializable { @Test public void repartition() { // Shrinking number of partitions - JavaRDD in1 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 2); + JavaRDD in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2); JavaRDD repartitioned1 = in1.repartition(4); List> result1 = repartitioned1.glom().collect(); Assert.assertEquals(4, result1.size()); @@ -484,7 +484,7 @@ public class JavaAPISuite implements Serializable { } // Growing number of partitions - JavaRDD in2 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 4); + JavaRDD in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4); JavaRDD repartitioned2 = in2.repartition(2); List> result2 = repartitioned2.glom().collect(); Assert.assertEquals(2, result2.size()); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 391d7ba21d..9da8adda83 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -186,8 +186,8 @@ public class JavaAPISuite implements Serializable { @Test public void testRepartitionMorePartitions() { List> inputData = Arrays.asList( - Arrays.asList(1,2,3,4,5,6,7,8,9,10), - Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); JavaDStream repartitioned = stream.repartition(4); JavaTestUtils.attachTestOutputStream(repartitioned); @@ -203,8 +203,8 @@ public class JavaAPISuite implements Serializable { @Test public void testRepartitionFewerPartitions() { List> inputData = Arrays.asList( - Arrays.asList(1,2,3,4,5,6,7,8,9,10), - Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); JavaDStream repartitioned = stream.repartition(2); JavaTestUtils.attachTestOutputStream(repartitioned); -- cgit v1.2.3 From ad5f579cbfb3f0c9834cc5fdd26ad0745f5f8abf Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 22:18:53 -0700 Subject: Style fixes --- .../org/apache/spark/streaming/JavaTestUtils.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 780f7b823b..5e384eeee4 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -33,9 +33,9 @@ trait JavaTestBase extends TestSuiteBase { * The stream will be derived from the supplied lists of Java objects. **/ def attachTestInputStream[T]( - ssc: JavaStreamingContext, - data: JList[JList[T]], - numPartitions: Int) = { + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) implicit val cm: ClassManifest[T] = @@ -50,7 +50,7 @@ trait JavaTestBase extends TestSuiteBase { * [[org.apache.spark.streaming.TestOutputStream]]. **/ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] @@ -66,7 +66,7 @@ trait JavaTestBase extends TestSuiteBase { * Returns a list of items for each RDD. */ def runStreams[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { implicit val cm: ClassManifest[V] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) @@ -83,16 +83,16 @@ trait JavaTestBase extends TestSuiteBase { * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each * representing one partition. */ - def runStreamsWithPartitions[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = { + def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int, + numExpectedOutput: Int): JList[JList[JList[V]]] = { implicit val cm: ClassManifest[V] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[JList[V]]]() - res.map(entry => { + res.map{entry => val lists = entry.map(new ArrayList[V](_)) out.append(new ArrayList[JList[V]](lists)) - }) + } out } } -- cgit v1.2.3