aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMartin Weindel <martin.weindel@gmail.com>2013-10-05 23:08:23 +0200
committerMartin Weindel <martin.weindel@gmail.com>2013-10-05 23:08:23 +0200
commite09f4a9601b18921c309903737d309eab5c6d891 (patch)
tree7c5c0602a36fe17feff8d0bb1db6c6f30d756199 /streaming
parent9b0c9c893d1b7d593b98c7117081051977fc81f3 (diff)
downloadspark-e09f4a9601b18921c309903737d309eab5c6d891.tar.gz
spark-e09f4a9601b18921c309903737d309eab5c6d891.tar.bz2
spark-e09f4a9601b18921c309903737d309eab5c6d891.zip
fixed some warnings
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala3
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java94
15 files changed, 76 insertions, 61 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
index 4eddc755b9..16c1567355 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
@@ -21,9 +21,10 @@ import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.CoGroupedRDD
import org.apache.spark.streaming.{Time, DStream, Duration}
+import scala.reflect.ClassTag
private[streaming]
-class CoGroupedDStream[K : ClassManifest](
+class CoGroupedDStream[K : ClassTag](
parents: Seq[DStream[(K, _)]],
partitioner: Partitioner
) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index a9a05c9981..f396c34758 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -19,11 +19,12 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, StreamingContext}
+import scala.reflect.ClassTag
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
*/
-class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T])
+class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
extends InputDStream[T](ssc_) {
override def start() {}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
index 91ee2c1a36..db2e0a4cee 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class FilteredDStream[T: ClassManifest](
+class FilteredDStream[T: ClassTag](
parent: DStream[T],
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index ca7d7ca49e..244dc3ee4f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
+import scala.reflect.ClassTag
private[streaming]
-class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
parent: DStream[(K, V)],
flatMapValueFunc: V => TraversableOnce[U]
) extends DStream[(K, U)](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index b37966f9a7..336c4b7a92 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
+class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index e21bac4602..98b14cb224 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, DStream, Job, Time}
+import scala.reflect.ClassTag
private[streaming]
-class ForEachDStream[T: ClassManifest] (
+class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
index 4294b07d91..23136f44fa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class GlommedDStream[T: ClassManifest](parent: DStream[T])
+class GlommedDStream[T: ClassTag](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {
override def dependencies = List(parent)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
index 5329601a6f..8a04060e5b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
+class MapPartitionedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
index 8290df90a2..0ce364fd46 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
+import scala.reflect.ClassTag
private[streaming]
-class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
+class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
parent: DStream[(K, V)],
mapValueFunc: V => U
) extends DStream[(K, U)](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
index b1682afea3..c0b7491d09 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.{Duration, DStream, Time}
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
private[streaming]
-class MappedDStream[T: ClassManifest, U: ClassManifest] (
+class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 15782f5c11..6f9477020a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -18,9 +18,10 @@
package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.StreamingContext
+import scala.reflect.ClassTag
private[streaming]
-class PluggableInputDStream[T: ClassManifest](
+class PluggableInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 7d9f3521b1..97325f8ea3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
-
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.streaming.{Time, StreamingContext}
+import scala.reflect.ClassTag
private[streaming]
-class QueueInputDStream[T: ClassManifest](
+class QueueInputDStream[T: ClassTag](
@transient ssc: StreamingContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index a95e66d761..e6e0022097 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -21,9 +21,10 @@ import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Duration, DStream, Time}
+import scala.reflect.ClassTag
private[streaming]
-class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
parent: DStream[(K,V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 60485adef9..73e1ddf7a4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, DStream, Time}
+import scala.reflect.ClassTag
private[streaming]
-class TransformedDStream[T: ClassManifest, U: ClassManifest] (
+class TransformedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
transformFunc: (RDD[T], Time) => RDD[U]
) extends DStream[U](parent.ssc) {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 783b8dea31..076fb53fa1 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -21,34 +21,36 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+
import kafka.serializer.StringDecoder;
+
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+
import scala.Tuple2;
+import twitter4j.Status;
+
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.dstream.SparkFlumeEvent;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.JavaCheckpointTestUtils;
-import org.apache.spark.streaming.InputStreamsSuite;
import java.io.*;
import java.util.*;
import akka.actor.Props;
import akka.zeromq.Subscribe;
-import akka.util.ByteString;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -85,8 +87,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(3L),
Arrays.asList(1L));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream count = stream.count();
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Long> count = stream.count();
JavaTestUtils.attachTestOutputStream(count);
List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
@@ -102,8 +104,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(5,5),
Arrays.asList(9,4));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
@@ -128,8 +130,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,4,5,6),
Arrays.asList(7,8,9));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.window(new Duration(2000));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> windowed = stream.window(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -152,8 +154,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
Arrays.asList(13,14,15,16,17,18));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
@@ -170,8 +172,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("giants"),
Arrays.asList("yankees"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream filtered = stream.filter(new Function<String, Boolean>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return s.contains("a");
@@ -193,8 +195,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(Arrays.asList("giants", "dodgers")),
Arrays.asList(Arrays.asList("yankees", "red socks")));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream glommed = stream.glom();
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<List<String>> glommed = stream.glom();
JavaTestUtils.attachTestOutputStream(glommed);
List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -211,8 +213,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("GIANTSDODGERS"),
Arrays.asList("YANKEESRED SOCKS"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> in) {
String out = "";
@@ -254,8 +256,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(15),
Arrays.asList(24));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream reduced = stream.reduce(new IntegerSum());
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reduced = stream.reduce(new IntegerSum());
JavaTestUtils.attachTestOutputStream(reduced);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -275,8 +277,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(39),
Arrays.asList(24));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new IntegerDifference(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -349,8 +351,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
Arrays.asList("a","t","h","l","e","t","i","c","s"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(x.split("(?!^)"));
@@ -396,8 +398,8 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, String>(9, "c"),
new Tuple2<Integer, String>(9, "s")));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
List<Tuple2<Integer, String>> out = Lists.newArrayList();
@@ -430,10 +432,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(2,2,5,5),
Arrays.asList(3,3,6,6));
- JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
- JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
+ JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
- JavaDStream unioned = stream1.union(stream2);
+ JavaDStream<Integer> unioned = stream1.union(stream2);
JavaTestUtils.attachTestOutputStream(unioned);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -444,7 +446,7 @@ public class JavaAPISuite implements Serializable {
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.
*/
- public static <T extends Comparable> void assertOrderInvariantEquals(
+ public static <T extends Comparable<T>> void assertOrderInvariantEquals(
List<List<T>> expected, List<List<T>> actual) {
for (List<T> list: expected) {
Collections.sort(list);
@@ -467,11 +469,11 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = stream.map(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2 call(String in) throws Exception {
+ public Tuple2<String, Integer> call(String in) throws Exception {
return new Tuple2<String, Integer>(in, in.length());
}
});
@@ -1163,8 +1165,8 @@ public class JavaAPISuite implements Serializable {
File tempDir = Files.createTempDir();
ssc.checkpoint(tempDir.getAbsolutePath());
- JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
@@ -1220,20 +1222,20 @@ public class JavaAPISuite implements Serializable {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
+ JavaDStream<String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaDStream<String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK());
HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zk.connect","localhost:12345");
kafkaParams.put("groupid","consumer-group");
- JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
+ JavaDStream<String> test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
StorageLevel.MEMORY_AND_DISK());
}
@Test
public void testSocketTextStream() {
- JavaDStream test = ssc.socketTextStream("localhost", 12345);
+ JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1253,7 +1255,7 @@ public class JavaAPISuite implements Serializable {
}
}
- JavaDStream test = ssc.socketStream(
+ JavaDStream<String> test = ssc.socketStream(
"localhost",
12345,
new Converter(),
@@ -1262,39 +1264,39 @@ public class JavaAPISuite implements Serializable {
@Test
public void testTextFileStream() {
- JavaDStream test = ssc.textFileStream("/tmp/foo");
+ JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
}
@Test
public void testRawSocketStream() {
- JavaDStream test = ssc.rawSocketStream("localhost", 12345);
+ JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
@Test
public void testFlumeStream() {
- JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
+ JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
}
@Test
public void testFileStream() {
JavaPairDStream<String, String> foo =
- ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
+ ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
}
@Test
public void testTwitterStream() {
String[] filters = new String[] { "good", "bad", "ugly" };
- JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
+ JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
}
@Test
public void testActorStream() {
- JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
+ JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
}
@Test
public void testZeroMQStream() {
- JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
+ JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
@Override
public Iterable<String> call(byte[][] b) throws Exception {
return null;