diff options
author | Sean Owen <sowen@cloudera.com> | 2015-09-12 10:40:10 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-09-12 10:40:10 +0100 |
commit | 22730ad54d681ad30e63fe910e8d89360853177d (patch) | |
tree | 81194034499a6d391a0949e865fc0aa6dd5fc4ec /external/kafka | |
parent | 8285e3b0d3dc0eff669eba993742dfe0401116f9 (diff) | |
download | spark-22730ad54d681ad30e63fe910e8d89360853177d.tar.gz spark-22730ad54d681ad30e63fe910e8d89360853177d.tar.bz2 spark-22730ad54d681ad30e63fe910e8d89360853177d.zip |
[SPARK-10547] [TEST] Streamline / improve style of Java API tests
Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order
Author: Sean Owen <sowen@cloudera.com>
Closes #8706 from srowen/SPARK-10547.
Diffstat (limited to 'external/kafka')
3 files changed, 28 insertions, 27 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 9db07d0507..fbdfbf7e50 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -75,11 +75,11 @@ public class JavaDirectKafkaStreamSuite implements Serializable { String[] topic1data = createTopicAndSendData(topic1); String[] topic2data = createTopicAndSendData(topic2); - HashSet<String> sent = new HashSet<String>(); + Set<String> sent = new HashSet<>(); sent.addAll(Arrays.asList(topic1data)); sent.addAll(Arrays.asList(topic2data)); - HashMap<String, String> kafkaParams = new HashMap<String, String>(); + Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); kafkaParams.put("auto.offset.reset", "smallest"); @@ -95,17 +95,17 @@ public class JavaDirectKafkaStreamSuite implements Serializable { // Make sure you can get offset ranges from the rdd new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { @Override - public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { + public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); - Assert.assertEquals(offsets[0].topic(), topic1); + Assert.assertEquals(topic1, offsets[0].topic()); return rdd; } } ).map( new Function<Tuple2<String, String>, String>() { @Override - public String call(Tuple2<String, String> kv) throws Exception { + public String call(Tuple2<String, String> kv) { return kv._2(); } } @@ -119,10 +119,10 @@ public class JavaDirectKafkaStreamSuite implements Serializable { StringDecoder.class, String.class, kafkaParams, - topicOffsetToMap(topic2, (long) 0), + topicOffsetToMap(topic2, 0L), new Function<MessageAndMetadata<String, String>, String>() { @Override - public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception { + public String call(MessageAndMetadata<String, String> msgAndMd) { return msgAndMd.message(); } } @@ -133,7 +133,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { unifiedStream.foreachRDD( new Function<JavaRDD<String>, Void>() { @Override - public Void call(JavaRDD<String> rdd) throws Exception { + public Void call(JavaRDD<String> rdd) { result.addAll(rdd.collect()); for (OffsetRange o : offsetRanges.get()) { System.out.println( @@ -155,14 +155,14 @@ public class JavaDirectKafkaStreamSuite implements Serializable { ssc.stop(); } - private HashSet<String> topicToSet(String topic) { - HashSet<String> topicSet = new HashSet<String>(); + private static Set<String> topicToSet(String topic) { + Set<String> topicSet = new HashSet<>(); topicSet.add(topic); return topicSet; } - private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) { - HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>(); + private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) { + Map<TopicAndPartition, Long> topicMap = new HashMap<>(); topicMap.put(new TopicAndPartition(topic, 0), offsetToStart); return topicMap; } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index a9dc6e5061..afcc6cfccd 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka; import java.io.Serializable; import java.util.HashMap; +import java.util.Map; import scala.Tuple2; @@ -66,10 +67,10 @@ public class JavaKafkaRDDSuite implements Serializable { String topic1 = "topic1"; String topic2 = "topic2"; - String[] topic1data = createTopicAndSendData(topic1); - String[] topic2data = createTopicAndSendData(topic2); + createTopicAndSendData(topic1); + createTopicAndSendData(topic2); - HashMap<String, String> kafkaParams = new HashMap<String, String>(); + Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); OffsetRange[] offsetRanges = { @@ -77,8 +78,8 @@ public class JavaKafkaRDDSuite implements Serializable { OffsetRange.create(topic2, 0, 0, 1) }; - HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap<TopicAndPartition, Broker>(); - HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>(); + Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>(); + Map<TopicAndPartition, Broker> leaders = new HashMap<>(); String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":"); Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); leaders.put(new TopicAndPartition(topic1, 0), broker); @@ -95,7 +96,7 @@ public class JavaKafkaRDDSuite implements Serializable { ).map( new Function<Tuple2<String, String>, String>() { @Override - public String call(Tuple2<String, String> kv) throws Exception { + public String call(Tuple2<String, String> kv) { return kv._2(); } } @@ -113,7 +114,7 @@ public class JavaKafkaRDDSuite implements Serializable { emptyLeaders, new Function<MessageAndMetadata<String, String>, String>() { @Override - public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception { + public String call(MessageAndMetadata<String, String> msgAndMd) { return msgAndMd.message(); } } @@ -131,7 +132,7 @@ public class JavaKafkaRDDSuite implements Serializable { leaders, new Function<MessageAndMetadata<String, String>, String>() { @Override - public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception { + public String call(MessageAndMetadata<String, String> msgAndMd) { return msgAndMd.message(); } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index e4c659215b..1e69de46cd 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -67,10 +67,10 @@ public class JavaKafkaStreamSuite implements Serializable { @Test public void testKafkaStream() throws InterruptedException { String topic = "topic1"; - HashMap<String, Integer> topics = new HashMap<String, Integer>(); + Map<String, Integer> topics = new HashMap<>(); topics.put(topic, 1); - HashMap<String, Integer> sent = new HashMap<String, Integer>(); + Map<String, Integer> sent = new HashMap<>(); sent.put("a", 5); sent.put("b", 3); sent.put("c", 10); @@ -78,7 +78,7 @@ public class JavaKafkaStreamSuite implements Serializable { kafkaTestUtils.createTopic(topic); kafkaTestUtils.sendMessages(topic, sent); - HashMap<String, String> kafkaParams = new HashMap<String, String>(); + Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress()); kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000)); kafkaParams.put("auto.offset.reset", "smallest"); @@ -97,7 +97,7 @@ public class JavaKafkaStreamSuite implements Serializable { JavaDStream<String> words = stream.map( new Function<Tuple2<String, String>, String>() { @Override - public String call(Tuple2<String, String> tuple2) throws Exception { + public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } } @@ -106,7 +106,7 @@ public class JavaKafkaStreamSuite implements Serializable { words.countByValue().foreachRDD( new Function<JavaPairRDD<String, Long>, Void>() { @Override - public Void call(JavaPairRDD<String, Long> rdd) throws Exception { + public Void call(JavaPairRDD<String, Long> rdd) { List<Tuple2<String, Long>> ret = rdd.collect(); for (Tuple2<String, Long> r : ret) { if (result.containsKey(r._1())) { @@ -130,8 +130,8 @@ public class JavaKafkaStreamSuite implements Serializable { Thread.sleep(200); } Assert.assertEquals(sent.size(), result.size()); - for (String k : sent.keySet()) { - Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue()); + for (Map.Entry<String, Integer> e : sent.entrySet()) { + Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue()); } } } |