aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorNick Pentreath <nick.pentreath@gmail.com>2013-02-21 09:33:08 +0200
committerNick Pentreath <nick.pentreath@gmail.com>2013-02-21 09:33:08 +0200
commit16d456742e596cdf5ac870d44ea2b4f308ddebdc (patch)
treea0989a98143f53ecaae0e9ffa4d01dc306f1f7fa /streaming
parent8a281399f970db761ea05baf07972fff1c5bd058 (diff)
parent2921fa7d81be201e5d694ab58ade6233f397eef9 (diff)
downloadspark-16d456742e596cdf5ac870d44ea2b4f308ddebdc.tar.gz
spark-16d456742e596cdf5ac870d44ea2b4f308ddebdc.tar.bz2
spark-16d456742e596cdf5ac870d44ea2b4f308ddebdc.zip
Merge remote-tracking branch 'upstream/streaming' into streaming-eg-algebird
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala69
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala4
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java233
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala5
11 files changed, 306 insertions, 43 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 7405c8b22e..e7a392fbbf 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -14,7 +14,7 @@ private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
- val framework = ssc.sc.jobName
+ val framework = ssc.sc.appName
val sparkHome = ssc.sc.sparkHome
val jars = ssc.sc.jars
val graph = ssc.graph
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 5a2dd46fa0..3ec922957d 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -411,7 +411,7 @@ extends Serializable {
): DStream[(K, (Seq[V], Seq[W]))] = {
val cgd = new CoGroupedDStream[K](
- Seq(self.asInstanceOf[DStream[(_, _)]], other.asInstanceOf[DStream[(_, _)]]),
+ Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]),
partitioner
)
val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index a9684c5772..d76ccfca4f 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -47,11 +47,11 @@ class StreamingContext private (
/**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param frameworkName A name for your job, to display on the cluster web UI
+ * @param appName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(master: String, frameworkName: String, batchDuration: Duration) =
- this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
+ def this(master: String, appName: String, batchDuration: Duration) =
+ this(StreamingContext.createNewSparkContext(master, appName), null, batchDuration)
/**
* Re-create a StreamingContext from a checkpoint file.
@@ -454,14 +454,14 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
- protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
+ protected[streaming] def createNewSparkContext(master: String, appName: String): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
if (MetadataCleaner.getDelaySeconds < 0) {
MetadataCleaner.setDelaySeconds(3600)
}
- new SparkContext(master, frameworkName)
+ new SparkContext(master, appName)
}
protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 30985b4ebc..51efe6cae8 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -4,6 +4,7 @@ import spark.streaming.{Duration, Time, DStream}
import spark.api.java.function.{Function => JFunction}
import spark.api.java.JavaRDD
import spark.storage.StorageLevel
+import spark.RDD
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -26,7 +27,9 @@ import spark.storage.StorageLevel
* - A function that is used to generate an RDD after each time interval
*/
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
- extends JavaDStreamLike[T, JavaDStream[T]] {
+ extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
+
+ override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 1c1ba05ff9..548809a359 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -6,17 +6,20 @@ import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
import spark.streaming._
-import spark.api.java.JavaRDD
+import spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
import java.util
import spark.RDD
import JavaDStream._
-trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable {
+trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
+ extends Serializable {
implicit val classManifest: ClassManifest[T]
def dstream: DStream[T]
+ def wrapRDD(in: RDD[T]): R
+
implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
in.map(new JLong(_))
}
@@ -112,8 +115,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -131,10 +134,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = {
+ def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -153,8 +156,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
- def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V])
- : JavaPairDStream[K, V] = {
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
+ : JavaPairDStream[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
}
@@ -212,35 +215,35 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
/**
* Return all the RDDs between 'fromDuration' to 'toDuration' (both included)
*/
- def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = {
- new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq)
+ def slice(fromTime: Time, toTime: Time): JList[R] = {
+ new util.ArrayList(dstream.slice(fromTime, toTime).map(wrapRDD(_)).toSeq)
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
- dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
+ def foreach(foreachFunc: JFunction[R, Void]) {
+ dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
- dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
+ def foreach(foreachFunc: JFunction2[R, Time, Void]) {
+ dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
- def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
+ def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
def scalaTransform (in: RDD[T]): RDD[U] =
- transformFunc.call(new JavaRDD[T](in)).rdd
+ transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
}
@@ -248,11 +251,41 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
- def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = {
+ def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
- transformFunc.call(new JavaRDD[T](in), time).rdd
+ transformFunc.call(wrapRDD(in), time).rdd
+ dstream.transform(scalaTransform(_, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
+ transformFunc.call(wrapRDD(in)).rdd
+ dstream.transform(scalaTransform(_))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
+ transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index 952ca657bf..30240cad98 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -8,18 +8,21 @@ import scala.collection.JavaConversions._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import spark.Partitioner
+import spark.{RDD, Partitioner}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
-import spark.api.java.JavaPairRDD
+import spark.api.java.{JavaRDD, JavaPairRDD}
import spark.storage.StorageLevel
import com.google.common.base.Optional
+import spark.RDD
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
- extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
+ extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
+
+ override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
// =======================================================================
// Methods common to all DStream's
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index d9a676819a..d2a0ba725f 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -27,11 +27,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Creates a StreamingContext.
* @param master Name of the Spark Master
- * @param frameworkName Name to be used when registering with the scheduler
+ * @param appName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(master: String, frameworkName: String, batchDuration: Duration) =
- this(new StreamingContext(master, frameworkName, batchDuration))
+ def this(master: String, appName: String, batchDuration: Duration) =
+ this(new StreamingContext(master, appName, batchDuration))
/**
* Creates a StreamingContext.
@@ -254,7 +254,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Registers an output stream that will be computed every interval
*/
- def registerOutputStream(outputStream: JavaDStreamLike[_, _]) {
+ def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
ssc.registerOutputStream(outputStream.dstream)
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index ddb1bf6b28..4ef4bb7de1 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -6,7 +6,7 @@ import spark.streaming.{Time, DStream, Duration}
private[streaming]
class CoGroupedDStream[K : ClassManifest](
- parents: Seq[DStream[(_, _)]],
+ parents: Seq[DStream[(K, _)]],
partitioner: Partitioner
) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index aa5a71e1ed..343b6915e7 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -105,8 +105,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values
- val cogroupedRDD =
- new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
+ val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
+ //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 5d510fd89f..4530af5f6a 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -11,7 +11,10 @@ import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
+import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
+import spark.api.java.JavaRDDLike;
+import spark.api.java.JavaPairRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
import spark.storage.StorageLevel;
@@ -293,8 +296,9 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(6,7,8),
Arrays.asList(9,10,11));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> transformed =
+ stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
@Override
public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
return in.map(new Function<Integer, Integer>() {
@@ -485,6 +489,141 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("new york", 1)));
@Test
+ public void testPairMap() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.map(
+ new PairFunction<Tuple2<String, Integer>, Integer, String>() {
+ @Override
+ public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
+ return in.swap();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMapPartitions() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions(
+ new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
+ LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ while (in.hasNext()) {
+ Tuple2<String, Integer> next = in.next();
+ out.add(next.swap());
+ }
+ return out;
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMap2() { // Maps pair -> single
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1, 3, 4, 1),
+ Arrays.asList(5, 5, 3, 1));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> reversed = pairStream.map(
+ new Function<Tuple2<String, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<String, Integer> in) throws Exception {
+ return in._2();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
+ List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap(
+ new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
+ List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ for (Character s : in._1().toCharArray()) {
+ out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+ }
+ return out;
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testPairGroupByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -548,7 +687,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
new Function<Integer, Integer>() {
- @Override
+ @Override
public Integer call(Integer i) throws Exception {
return i;
}
@@ -669,7 +808,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
- new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){
+ new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
int out = 0;
@@ -681,7 +820,7 @@ public class JavaAPISuite implements Serializable {
}
return Optional.of(out);
}
- });
+ });
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -741,6 +880,90 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testPairTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
+ new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
+ @Override
+ public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ return in.sortByKey();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(sorted);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToNormalRDDTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3,1,4,2),
+ Arrays.asList(2,3,4,1));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaDStream<Integer> firstParts = pairStream.transform(
+ new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
+ @Override
+ public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<Integer, Integer> in) {
+ return in._1();
+ }
+ });
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(firstParts);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 52ea28732a..64a7e7cbf9 100644
--- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase {
* Attach a provided stream to it's associated StreamingContext as a
* [[spark.streaming.TestOutputStream]].
**/
- def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]](
- dstream: JavaDStreamLike[T, This]) = {
+ def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R],
+ R <: spark.api.java.JavaRDDLike[T, R]](
+ dstream: JavaDStreamLike[T, This, R]) = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val ostream = new TestOutputStream(dstream.dstream,