diff options
Diffstat (limited to 'external/kafka')
3 files changed, 22 insertions, 34 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 1334cc8fd1..d6ca6d58b5 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -20,32 +20,27 @@ package org.apache.spark.streaming.kafka; import java.io.Serializable; import java.util.HashMap; import java.util.HashSet; -import java.util.Random; import java.util.Arrays; -import org.apache.spark.SparkConf; - import scala.Tuple2; -import junit.framework.Assert; - import kafka.common.TopicAndPartition; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.Test; -import org.junit.After; -import org.junit.Before; - public class JavaDirectKafkaStreamSuite implements Serializable { private transient JavaStreamingContext ssc = null; - private transient Random random = new Random(); private transient KafkaStreamSuiteBase suiteBase = null; @Before @@ -93,7 +88,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { ).map( new Function<Tuple2<String, String>, String>() { @Override - public String call(scala.Tuple2<String, String> kv) throws Exception { + public String call(Tuple2<String, String> kv) throws Exception { return kv._2(); } } @@ -121,7 +116,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { unifiedStream.foreachRDD( new Function<JavaRDD<String>, Void>() { @Override - public Void call(org.apache.spark.api.java.JavaRDD<String> rdd) throws Exception { + public Void call(JavaRDD<String> rdd) throws Exception { result.addAll(rdd.collect()); return null; } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index 9d2e1705c6..4477b81827 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -19,27 +19,22 @@ package org.apache.spark.streaming.kafka; import java.io.Serializable; import java.util.HashMap; -import java.util.HashSet; -import java.util.Arrays; - -import org.apache.spark.SparkConf; import scala.Tuple2; -import junit.framework.Assert; - import kafka.common.TopicAndPartition; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.junit.Test; -import org.junit.After; -import org.junit.Before; - public class JavaKafkaRDDSuite implements Serializable { private transient JavaSparkContext sc = null; private transient KafkaStreamSuiteBase suiteBase = null; @@ -78,8 +73,8 @@ public class JavaKafkaRDDSuite implements Serializable { OffsetRange.create(topic2, 0, 0, 1) }; - HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap(); - HashMap<TopicAndPartition, Broker> leaders = new HashMap(); + HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap<TopicAndPartition, Broker>(); + HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>(); String[] hostAndPort = suiteBase.brokerAddress().split(":"); Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); leaders.put(new TopicAndPartition(topic1, 0), broker); @@ -96,7 +91,7 @@ public class JavaKafkaRDDSuite implements Serializable { ).map( new Function<Tuple2<String, String>, String>() { @Override - public String call(scala.Tuple2<String, String> kv) throws Exception { + public String call(Tuple2<String, String> kv) throws Exception { return kv._2(); } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 208cc51b29..bad0a93eb2 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -22,27 +22,25 @@ import java.util.HashMap; import java.util.List; import java.util.Random; -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.Duration; import scala.Predef; import scala.Tuple2; import scala.collection.JavaConverters; -import junit.framework.Assert; - import kafka.serializer.StringDecoder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.Test; -import org.junit.After; -import org.junit.Before; - public class JavaKafkaStreamSuite implements Serializable { private transient JavaStreamingContext ssc = null; private transient Random random = new Random(); |