aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-09-12 10:40:10 +0100
committerSean Owen <sowen@cloudera.com>2015-09-12 10:40:10 +0100
commit22730ad54d681ad30e63fe910e8d89360853177d (patch)
tree81194034499a6d391a0949e865fc0aa6dd5fc4ec /external
parent8285e3b0d3dc0eff669eba993742dfe0401116f9 (diff)
downloadspark-22730ad54d681ad30e63fe910e8d89360853177d.tar.gz
spark-22730ad54d681ad30e63fe910e8d89360853177d.tar.bz2
spark-22730ad54d681ad30e63fe910e8d89360853177d.zip
[SPARK-10547] [TEST] Streamline / improve style of Java API tests
Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order Author: Sean Owen <sowen@cloudera.com> Closes #8706 from srowen/SPARK-10547.
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java24
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java17
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java14
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java4
4 files changed, 29 insertions, 30 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 9db07d0507..fbdfbf7e50 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -75,11 +75,11 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
String[] topic1data = createTopicAndSendData(topic1);
String[] topic2data = createTopicAndSendData(topic2);
- HashSet<String> sent = new HashSet<String>();
+ Set<String> sent = new HashSet<>();
sent.addAll(Arrays.asList(topic1data));
sent.addAll(Arrays.asList(topic2data));
- HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
kafkaParams.put("auto.offset.reset", "smallest");
@@ -95,17 +95,17 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
// Make sure you can get offset ranges from the rdd
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
- public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
+ public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
- Assert.assertEquals(offsets[0].topic(), topic1);
+ Assert.assertEquals(topic1, offsets[0].topic());
return rdd;
}
}
).map(
new Function<Tuple2<String, String>, String>() {
@Override
- public String call(Tuple2<String, String> kv) throws Exception {
+ public String call(Tuple2<String, String> kv) {
return kv._2();
}
}
@@ -119,10 +119,10 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
StringDecoder.class,
String.class,
kafkaParams,
- topicOffsetToMap(topic2, (long) 0),
+ topicOffsetToMap(topic2, 0L),
new Function<MessageAndMetadata<String, String>, String>() {
@Override
- public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ public String call(MessageAndMetadata<String, String> msgAndMd) {
return msgAndMd.message();
}
}
@@ -133,7 +133,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
unifiedStream.foreachRDD(
new Function<JavaRDD<String>, Void>() {
@Override
- public Void call(JavaRDD<String> rdd) throws Exception {
+ public Void call(JavaRDD<String> rdd) {
result.addAll(rdd.collect());
for (OffsetRange o : offsetRanges.get()) {
System.out.println(
@@ -155,14 +155,14 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
ssc.stop();
}
- private HashSet<String> topicToSet(String topic) {
- HashSet<String> topicSet = new HashSet<String>();
+ private static Set<String> topicToSet(String topic) {
+ Set<String> topicSet = new HashSet<>();
topicSet.add(topic);
return topicSet;
}
- private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
- HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
+ private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
+ Map<TopicAndPartition, Long> topicMap = new HashMap<>();
topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
return topicMap;
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index a9dc6e5061..afcc6cfccd 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.Map;
import scala.Tuple2;
@@ -66,10 +67,10 @@ public class JavaKafkaRDDSuite implements Serializable {
String topic1 = "topic1";
String topic2 = "topic2";
- String[] topic1data = createTopicAndSendData(topic1);
- String[] topic2data = createTopicAndSendData(topic2);
+ createTopicAndSendData(topic1);
+ createTopicAndSendData(topic2);
- HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
OffsetRange[] offsetRanges = {
@@ -77,8 +78,8 @@ public class JavaKafkaRDDSuite implements Serializable {
OffsetRange.create(topic2, 0, 0, 1)
};
- HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap<TopicAndPartition, Broker>();
- HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>();
+ Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>();
+ Map<TopicAndPartition, Broker> leaders = new HashMap<>();
String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
leaders.put(new TopicAndPartition(topic1, 0), broker);
@@ -95,7 +96,7 @@ public class JavaKafkaRDDSuite implements Serializable {
).map(
new Function<Tuple2<String, String>, String>() {
@Override
- public String call(Tuple2<String, String> kv) throws Exception {
+ public String call(Tuple2<String, String> kv) {
return kv._2();
}
}
@@ -113,7 +114,7 @@ public class JavaKafkaRDDSuite implements Serializable {
emptyLeaders,
new Function<MessageAndMetadata<String, String>, String>() {
@Override
- public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ public String call(MessageAndMetadata<String, String> msgAndMd) {
return msgAndMd.message();
}
}
@@ -131,7 +132,7 @@ public class JavaKafkaRDDSuite implements Serializable {
leaders,
new Function<MessageAndMetadata<String, String>, String>() {
@Override
- public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
+ public String call(MessageAndMetadata<String, String> msgAndMd) {
return msgAndMd.message();
}
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index e4c659215b..1e69de46cd 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -67,10 +67,10 @@ public class JavaKafkaStreamSuite implements Serializable {
@Test
public void testKafkaStream() throws InterruptedException {
String topic = "topic1";
- HashMap<String, Integer> topics = new HashMap<String, Integer>();
+ Map<String, Integer> topics = new HashMap<>();
topics.put(topic, 1);
- HashMap<String, Integer> sent = new HashMap<String, Integer>();
+ Map<String, Integer> sent = new HashMap<>();
sent.put("a", 5);
sent.put("b", 3);
sent.put("c", 10);
@@ -78,7 +78,7 @@ public class JavaKafkaStreamSuite implements Serializable {
kafkaTestUtils.createTopic(topic);
kafkaTestUtils.sendMessages(topic, sent);
- HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress());
kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");
@@ -97,7 +97,7 @@ public class JavaKafkaStreamSuite implements Serializable {
JavaDStream<String> words = stream.map(
new Function<Tuple2<String, String>, String>() {
@Override
- public String call(Tuple2<String, String> tuple2) throws Exception {
+ public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}
@@ -106,7 +106,7 @@ public class JavaKafkaStreamSuite implements Serializable {
words.countByValue().foreachRDD(
new Function<JavaPairRDD<String, Long>, Void>() {
@Override
- public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
+ public Void call(JavaPairRDD<String, Long> rdd) {
List<Tuple2<String, Long>> ret = rdd.collect();
for (Tuple2<String, Long> r : ret) {
if (result.containsKey(r._1())) {
@@ -130,8 +130,8 @@ public class JavaKafkaStreamSuite implements Serializable {
Thread.sleep(200);
}
Assert.assertEquals(sent.size(), result.size());
- for (String k : sent.keySet()) {
- Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
+ for (Map.Entry<String, Integer> e : sent.entrySet()) {
+ Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
}
}
}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index e46b4e5c75..26ec8af455 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -17,8 +17,6 @@
package org.apache.spark.streaming.twitter;
-import java.util.Arrays;
-
import org.junit.Test;
import twitter4j.Status;
import twitter4j.auth.Authorization;
@@ -30,7 +28,7 @@ import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
@Test
public void testTwitterStream() {
- String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
+ String[] filters = { "filter1", "filter2" };
Authorization auth = NullAuthorization.getInstance();
// tests the API, does not actually test data receiving