diff options
author | Martin Weindel <martin.weindel@gmail.com> | 2013-10-05 23:08:23 +0200 |
---|---|---|
committer | Martin Weindel <martin.weindel@gmail.com> | 2013-10-05 23:08:23 +0200 |
commit | e09f4a9601b18921c309903737d309eab5c6d891 (patch) | |
tree | 7c5c0602a36fe17feff8d0bb1db6c6f30d756199 /streaming | |
parent | 9b0c9c893d1b7d593b98c7117081051977fc81f3 (diff) | |
download | spark-e09f4a9601b18921c309903737d309eab5c6d891.tar.gz spark-e09f4a9601b18921c309903737d309eab5c6d891.tar.bz2 spark-e09f4a9601b18921c309903737d309eab5c6d891.zip |
fixed some warnings
Diffstat (limited to 'streaming')
15 files changed, 76 insertions, 61 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala index 4eddc755b9..16c1567355 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala @@ -21,9 +21,10 @@ import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import org.apache.spark.rdd.CoGroupedRDD import org.apache.spark.streaming.{Time, DStream, Duration} +import scala.reflect.ClassTag private[streaming] -class CoGroupedDStream[K : ClassManifest]( +class CoGroupedDStream[K : ClassTag]( parents: Seq[DStream[(K, _)]], partitioner: Partitioner ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala index a9a05c9981..f396c34758 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala @@ -19,11 +19,12 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Time, StreamingContext} +import scala.reflect.ClassTag /** * An input stream that always returns the same RDD on each timestep. Useful for testing. */ -class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T]) +class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T]) extends InputDStream[T](ssc_) { override def start() {} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index 91ee2c1a36..db2e0a4cee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class FilteredDStream[T: ClassManifest]( +class FilteredDStream[T: ClassTag]( parent: DStream[T], filterFunc: T => Boolean ) extends DStream[T](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index ca7d7ca49e..244dc3ee4f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import scala.reflect.ClassTag private[streaming] -class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( +class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( parent: DStream[(K, V)], flatMapValueFunc: V => TraversableOnce[U] ) extends DStream[(K, U)](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index b37966f9a7..336c4b7a92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( +class FlatMappedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], flatMapFunc: T => Traversable[U] ) extends DStream[U](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index e21bac4602..98b14cb224 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, DStream, Job, Time} +import scala.reflect.ClassTag private[streaming] -class ForEachDStream[T: ClassManifest] ( +class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index 4294b07d91..23136f44fa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class GlommedDStream[T: ClassManifest](parent: DStream[T]) +class GlommedDStream[T: ClassTag](parent: DStream[T]) extends DStream[Array[T]](parent.ssc) { override def dependencies = List(parent) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index 5329601a6f..8a04060e5b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( +class MapPartitionedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 8290df90a2..0ce364fd46 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -20,9 +20,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import scala.reflect.ClassTag private[streaming] -class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( +class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag]( parent: DStream[(K, V)], mapValueFunc: V => U ) extends DStream[(K, U)](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index b1682afea3..c0b7491d09 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, DStream, Time} import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag private[streaming] -class MappedDStream[T: ClassManifest, U: ClassManifest] ( +class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala index 15782f5c11..6f9477020a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala @@ -18,9 +18,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.StreamingContext +import scala.reflect.ClassTag private[streaming] -class PluggableInputDStream[T: ClassManifest]( +class PluggableInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index 7d9f3521b1..97325f8ea3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD - import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer import org.apache.spark.streaming.{Time, StreamingContext} +import scala.reflect.ClassTag private[streaming] -class QueueInputDStream[T: ClassManifest]( +class QueueInputDStream[T: ClassTag]( @transient ssc: StreamingContext, val queue: Queue[RDD[T]], oneAtATime: Boolean, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index a95e66d761..e6e0022097 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -21,9 +21,10 @@ import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.streaming.{Duration, DStream, Time} +import scala.reflect.ClassTag private[streaming] -class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( +class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( parent: DStream[(K,V)], createCombiner: V => C, mergeValue: (C, V) => C, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 60485adef9..73e1ddf7a4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -19,9 +19,10 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, DStream, Time} +import scala.reflect.ClassTag private[streaming] -class TransformedDStream[T: ClassManifest, U: ClassManifest] ( +class TransformedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], transformFunc: (RDD[T], Time) => RDD[U] ) extends DStream[U](parent.ssc) { diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 783b8dea31..076fb53fa1 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -21,34 +21,36 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; + import kafka.serializer.StringDecoder; + import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; + import scala.Tuple2; +import twitter4j.Status; + import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaRDDLike; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.dstream.SparkFlumeEvent; import org.apache.spark.streaming.JavaTestUtils; import org.apache.spark.streaming.JavaCheckpointTestUtils; -import org.apache.spark.streaming.InputStreamsSuite; import java.io.*; import java.util.*; import akka.actor.Props; import akka.zeromq.Subscribe; -import akka.util.ByteString; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -85,8 +87,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(3L), Arrays.asList(1L)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream count = stream.count(); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Long> count = stream.count(); JavaTestUtils.attachTestOutputStream(count); List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result); @@ -102,8 +104,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(5,5), Arrays.asList(9,4)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function<String, Integer>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return s.length(); @@ -128,8 +130,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9,4,5,6), Arrays.asList(7,8,9)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(2000)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> windowed = stream.window(new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -152,8 +154,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), Arrays.asList(13,14,15,16,17,18)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4); @@ -170,8 +172,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList("giants"), Arrays.asList("yankees")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream filtered = stream.filter(new Function<String, Boolean>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { return s.contains("a"); @@ -193,8 +195,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(Arrays.asList("giants", "dodgers")), Arrays.asList(Arrays.asList("yankees", "red socks"))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream glommed = stream.glom(); + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<List<String>> glommed = stream.glom(); JavaTestUtils.attachTestOutputStream(glommed); List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -211,8 +213,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList("GIANTSDODGERS"), Arrays.asList("YANKEESRED SOCKS")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { @Override public Iterable<String> call(Iterator<String> in) { String out = ""; @@ -254,8 +256,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(15), Arrays.asList(24)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reduced = stream.reduce(new IntegerSum()); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> reduced = stream.reduce(new IntegerSum()); JavaTestUtils.attachTestOutputStream(reduced); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -275,8 +277,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(39), Arrays.asList(24)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reducedWindowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -349,8 +351,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), Arrays.asList("a","t","h","l","e","t","i","c","s")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(x.split("(?!^)")); @@ -396,8 +398,8 @@ public class JavaAPISuite implements Serializable { new Tuple2<Integer, String>(9, "c"), new Tuple2<Integer, String>(9, "s"))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { List<Tuple2<Integer, String>> out = Lists.newArrayList(); @@ -430,10 +432,10 @@ public class JavaAPISuite implements Serializable { Arrays.asList(2,2,5,5), Arrays.asList(3,3,6,6)); - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); + JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); - JavaDStream unioned = stream1.union(stream2); + JavaDStream<Integer> unioned = stream1.union(stream2); JavaTestUtils.attachTestOutputStream(unioned); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -444,7 +446,7 @@ public class JavaAPISuite implements Serializable { * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. */ - public static <T extends Comparable> void assertOrderInvariantEquals( + public static <T extends Comparable<T>> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { for (List<T> list: expected) { Collections.sort(list); @@ -467,11 +469,11 @@ public class JavaAPISuite implements Serializable { Arrays.asList(new Tuple2<String, Integer>("giants", 6)), Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = stream.map( new PairFunction<String, String, Integer>() { @Override - public Tuple2 call(String in) throws Exception { + public Tuple2<String, Integer> call(String in) throws Exception { return new Tuple2<String, Integer>(in, in.length()); } }); @@ -1163,8 +1165,8 @@ public class JavaAPISuite implements Serializable { File tempDir = Files.createTempDir(); ssc.checkpoint(tempDir.getAbsolutePath()); - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function<String, Integer>() { + JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return s.length(); @@ -1220,20 +1222,20 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, + JavaDStream<String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaDStream<String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK()); HashMap<String, String> kafkaParams = Maps.newHashMap(); kafkaParams.put("zk.connect","localhost:12345"); kafkaParams.put("groupid","consumer-group"); - JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, + JavaDStream<String> test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()); } @Test public void testSocketTextStream() { - JavaDStream test = ssc.socketTextStream("localhost", 12345); + JavaDStream<String> test = ssc.socketTextStream("localhost", 12345); } @Test @@ -1253,7 +1255,7 @@ public class JavaAPISuite implements Serializable { } } - JavaDStream test = ssc.socketStream( + JavaDStream<String> test = ssc.socketStream( "localhost", 12345, new Converter(), @@ -1262,39 +1264,39 @@ public class JavaAPISuite implements Serializable { @Test public void testTextFileStream() { - JavaDStream test = ssc.textFileStream("/tmp/foo"); + JavaDStream<String> test = ssc.textFileStream("/tmp/foo"); } @Test public void testRawSocketStream() { - JavaDStream test = ssc.rawSocketStream("localhost", 12345); + JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345); } @Test public void testFlumeStream() { - JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); + JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); } @Test public void testFileStream() { JavaPairDStream<String, String> foo = - ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); + ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); } @Test public void testTwitterStream() { String[] filters = new String[] { "good", "bad", "ugly" }; - JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); + JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); } @Test public void testActorStream() { - JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); + JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); } @Test public void testZeroMQStream() { - JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { + JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { @Override public Iterable<String> call(byte[][] b) throws Exception { return null; |