aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-08 17:47:44 +0000
committerSean Owen <sowen@cloudera.com>2016-01-08 17:47:44 +0000
commitb9c835337880f57fe8b953962913bcc524162348 (patch)
tree5dc476b1a65d513210d1124db144aaa3c5f66679
parent794ea553bd0fcfece15b610b47ee86d6644134c9 (diff)
downloadspark-b9c835337880f57fe8b953962913bcc524162348.tar.gz
spark-b9c835337880f57fe8b953962913bcc524162348.tar.bz2
spark-b9c835337880f57fe8b953962913bcc524162348.zip
[SPARK-12618][CORE][STREAMING][SQL] Clean up build warnings: 2.0.0 edition
Fix most build warnings: mostly deprecated API usages. I'll annotate some of the changes below. CC rxin who is leading the charge to remove the deprecated APIs. Author: Sean Owen <sowen@cloudera.com> Closes #10570 from srowen/SPARK-12618.
-rw-r--r--core/src/test/scala/org/apache/spark/Smuggle.scala1
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java21
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java8
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java8
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java36
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala2
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java7
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java8
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala8
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java4
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java18
-rw-r--r--python/pyspark/mllib/clustering.py2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala3
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala1
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java64
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java14
24 files changed, 123 insertions, 137 deletions
diff --git a/core/src/test/scala/org/apache/spark/Smuggle.scala b/core/src/test/scala/org/apache/spark/Smuggle.scala
index 01694a6e6f..9f0a1b4c25 100644
--- a/core/src/test/scala/org/apache/spark/Smuggle.scala
+++ b/core/src/test/scala/org/apache/spark/Smuggle.scala
@@ -21,6 +21,7 @@ import java.util.UUID
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable
+import scala.language.implicitConversions
/**
* Utility wrapper to "smuggle" objects into tasks while bypassing serialization.
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
index 779fac01c4..3d8babba04 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
@@ -56,6 +56,7 @@ public class JavaBinaryClassificationMetricsExample {
// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
new Function<LabeledPoint, Tuple2<Object, Object>>() {
+ @Override
public Tuple2<Object, Object> call(LabeledPoint p) {
Double prediction = model.predict(p.features());
return new Tuple2<Object, Object>(prediction, p.label());
@@ -88,6 +89,7 @@ public class JavaBinaryClassificationMetricsExample {
// Thresholds
JavaRDD<Double> thresholds = precision.map(
new Function<Tuple2<Object, Object>, Double>() {
+ @Override
public Double call(Tuple2<Object, Object> t) {
return new Double(t._1().toString());
}
@@ -106,8 +108,7 @@ public class JavaBinaryClassificationMetricsExample {
// Save and load model
model.save(sc, "target/tmp/LogisticRegressionModel");
- LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
- "target/tmp/LogisticRegressionModel");
+ LogisticRegressionModel.load(sc, "target/tmp/LogisticRegressionModel");
// $example off$
}
}
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
index 47ab3fc358..4ad2104763 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
@@ -41,6 +41,7 @@ public class JavaRankingMetricsExample {
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Rating> ratings = data.map(
new Function<String, Rating>() {
+ @Override
public Rating call(String line) {
String[] parts = line.split("::");
return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double
@@ -57,13 +58,14 @@ public class JavaRankingMetricsExample {
JavaRDD<Tuple2<Object, Rating[]>> userRecs = model.recommendProductsForUsers(10).toJavaRDD();
JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map(
new Function<Tuple2<Object, Rating[]>, Tuple2<Object, Rating[]>>() {
+ @Override
public Tuple2<Object, Rating[]> call(Tuple2<Object, Rating[]> t) {
Rating[] scaledRatings = new Rating[t._2().length];
for (int i = 0; i < scaledRatings.length; i++) {
double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0);
scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating);
}
- return new Tuple2<Object, Rating[]>(t._1(), scaledRatings);
+ return new Tuple2<>(t._1(), scaledRatings);
}
}
);
@@ -72,6 +74,7 @@ public class JavaRankingMetricsExample {
// Map ratings to 1 or 0, 1 indicating a movie that should be recommended
JavaRDD<Rating> binarizedRatings = ratings.map(
new Function<Rating, Rating>() {
+ @Override
public Rating call(Rating r) {
double binaryRating;
if (r.rating() > 0.0) {
@@ -87,6 +90,7 @@ public class JavaRankingMetricsExample {
// Group ratings by common user
JavaPairRDD<Object, Iterable<Rating>> userMovies = binarizedRatings.groupBy(
new Function<Rating, Object>() {
+ @Override
public Object call(Rating r) {
return r.user();
}
@@ -96,8 +100,9 @@ public class JavaRankingMetricsExample {
// Get true relevant documents from all user ratings
JavaPairRDD<Object, List<Integer>> userMoviesList = userMovies.mapValues(
new Function<Iterable<Rating>, List<Integer>>() {
+ @Override
public List<Integer> call(Iterable<Rating> docs) {
- List<Integer> products = new ArrayList<Integer>();
+ List<Integer> products = new ArrayList<>();
for (Rating r : docs) {
if (r.rating() > 0.0) {
products.add(r.product());
@@ -111,8 +116,9 @@ public class JavaRankingMetricsExample {
// Extract the product id from each recommendation
JavaPairRDD<Object, List<Integer>> userRecommendedList = userRecommended.mapValues(
new Function<Rating[], List<Integer>>() {
+ @Override
public List<Integer> call(Rating[] docs) {
- List<Integer> products = new ArrayList<Integer>();
+ List<Integer> products = new ArrayList<>();
for (Rating r : docs) {
products.add(r.product());
}
@@ -124,7 +130,7 @@ public class JavaRankingMetricsExample {
userRecommendedList).values();
// Instantiate the metrics object
- RankingMetrics metrics = RankingMetrics.of(relevantDocs);
+ RankingMetrics<Integer> metrics = RankingMetrics.of(relevantDocs);
// Precision and NDCG at k
Integer[] kVector = {1, 3, 5};
@@ -139,6 +145,7 @@ public class JavaRankingMetricsExample {
// Evaluate the model using numerical ratings and regression metrics
JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
new Function<Rating, Tuple2<Object, Object>>() {
+ @Override
public Tuple2<Object, Object> call(Rating r) {
return new Tuple2<Object, Object>(r.user(), r.product());
}
@@ -147,18 +154,20 @@ public class JavaRankingMetricsExample {
JavaPairRDD<Tuple2<Integer, Integer>, Object> predictions = JavaPairRDD.fromJavaRDD(
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
+ @Override
public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
return new Tuple2<Tuple2<Integer, Integer>, Object>(
- new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
+ new Tuple2<>(r.user(), r.product()), r.rating());
}
}
));
JavaRDD<Tuple2<Object, Object>> ratesAndPreds =
JavaPairRDD.fromJavaRDD(ratings.map(
new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
+ @Override
public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
return new Tuple2<Tuple2<Integer, Integer>, Object>(
- new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
+ new Tuple2<>(r.user(), r.product()), r.rating());
}
}
)).join(predictions).values();
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index 90d473703e..bc963a02be 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -36,6 +36,7 @@ import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
@@ -154,9 +155,9 @@ public final class JavaRecoverableNetworkWordCount {
}
});
- wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
+ wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
@Override
- public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
+ public void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
// Get or register the blacklist Broadcast
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
@@ -164,7 +165,7 @@ public final class JavaRecoverableNetworkWordCount {
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
- public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
+ public Boolean call(Tuple2<String, Integer> wordCount) {
if (blacklist.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
@@ -178,7 +179,6 @@ public final class JavaRecoverableNetworkWordCount {
System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
System.out.println("Appending to " + outputFile.getAbsolutePath());
Files.append(output + "\n", outputFile, Charset.defaultCharset());
- return null;
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index 3515d7be45..084f68a8be 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -26,7 +26,7 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.api.java.StorageLevels;
@@ -78,13 +78,14 @@ public final class JavaSqlNetworkWordCount {
});
// Convert RDDs of the words DStream to DataFrame and run SQL query
- words.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
+ words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
@Override
- public Void call(JavaRDD<String> rdd, Time time) {
+ public void call(JavaRDD<String> rdd, Time time) {
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
// Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() {
+ @Override
public JavaRecord call(String word) {
JavaRecord record = new JavaRecord();
record.setWord(word);
@@ -101,7 +102,6 @@ public final class JavaSqlNetworkWordCount {
sqlContext.sql("select word, count(*) as total from words group by word");
System.out.println("========= " + time + "=========");
wordCountsDataFrame.show();
- return null;
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
index 030ee30b93..d869768026 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
@@ -17,13 +17,13 @@
package org.apache.spark.examples.streaming;
-import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -33,8 +33,6 @@ import org.apache.spark.streaming.twitter.TwitterUtils;
import scala.Tuple2;
import twitter4j.Status;
-import java.io.IOException;
-import java.net.URI;
import java.util.Arrays;
import java.util.List;
@@ -44,7 +42,7 @@ import java.util.List;
*/
public class JavaTwitterHashTagJoinSentiments {
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) {
if (args.length < 4) {
System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
" <access token> <access token secret> [<filters>]");
@@ -79,7 +77,7 @@ public class JavaTwitterHashTagJoinSentiments {
JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
@Override
- public Boolean call(String word) throws Exception {
+ public Boolean call(String word) {
return word.startsWith("#");
}
});
@@ -91,8 +89,7 @@ public class JavaTwitterHashTagJoinSentiments {
@Override
public Tuple2<String, Double> call(String line) {
String[] columns = line.split("\t");
- return new Tuple2<String, Double>(columns[0],
- Double.parseDouble(columns[1]));
+ return new Tuple2<>(columns[0], Double.parseDouble(columns[1]));
}
});
@@ -101,7 +98,7 @@ public class JavaTwitterHashTagJoinSentiments {
@Override
public Tuple2<String, Integer> call(String s) {
// leave out the # character
- return new Tuple2<String, Integer>(s.substring(1), 1);
+ return new Tuple2<>(s.substring(1), 1);
}
});
@@ -120,9 +117,8 @@ public class JavaTwitterHashTagJoinSentiments {
hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
JavaPairRDD<String, Tuple2<Double, Integer>>>() {
@Override
- public JavaPairRDD<String, Tuple2<Double, Integer>> call(JavaPairRDD<String,
- Integer> topicCount)
- throws Exception {
+ public JavaPairRDD<String, Tuple2<Double, Integer>> call(
+ JavaPairRDD<String, Integer> topicCount) {
return wordSentiments.join(topicCount);
}
});
@@ -131,9 +127,9 @@ public class JavaTwitterHashTagJoinSentiments {
new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String,
- Tuple2<Double, Integer>> topicAndTuplePair) throws Exception {
+ Tuple2<Double, Integer>> topicAndTuplePair) {
Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
- return new Tuple2<String, Double>(topicAndTuplePair._1(),
+ return new Tuple2<>(topicAndTuplePair._1(),
happinessAndCount._1() * happinessAndCount._2());
}
});
@@ -141,9 +137,8 @@ public class JavaTwitterHashTagJoinSentiments {
JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
new PairFunction<Tuple2<String, Double>, Double, String>() {
@Override
- public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness)
- throws Exception {
- return new Tuple2<Double, String>(topicHappiness._2(),
+ public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness) {
+ return new Tuple2<>(topicHappiness._2(),
topicHappiness._1());
}
});
@@ -151,17 +146,17 @@ public class JavaTwitterHashTagJoinSentiments {
JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
@Override
- public JavaPairRDD<Double, String> call(JavaPairRDD<Double,
- String> happinessAndTopics) throws Exception {
+ public JavaPairRDD<Double, String> call(
+ JavaPairRDD<Double, String> happinessAndTopics) {
return happinessAndTopics.sortByKey(false);
}
}
);
// Print hash tags with the most positive sentiment values
- happiest10.foreachRDD(new Function<JavaPairRDD<Double, String>, Void>() {
+ happiest10.foreachRDD(new VoidFunction<JavaPairRDD<Double, String>>() {
@Override
- public Void call(JavaPairRDD<Double, String> happinessTopicPairs) throws Exception {
+ public void call(JavaPairRDD<Double, String> happinessTopicPairs) {
List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
System.out.println(
String.format("\nHappiest topics in last 10 seconds (%s total):",
@@ -170,7 +165,6 @@ public class JavaTwitterHashTagJoinSentiments {
System.out.println(
String.format("%s (%s happiness)", pair._2(), pair._1()));
}
- return null;
}
});
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 04dec57b71..e4486b949f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -74,7 +74,7 @@ object SparkHdfsLR {
val conf = new Configuration()
val sc = new SparkContext(sparkConf)
val lines = sc.textFile(inputPath)
- val points = lines.map(parsePoint _).cache()
+ val points = lines.map(parsePoint).cache()
val ITERATIONS = args(1).toInt
// Initialize w to a random value
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
index ddc99d3f90..8b739c9d7c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
@@ -71,7 +71,7 @@ object SparkTachyonHdfsLR {
val conf = new Configuration()
val sc = new SparkContext(sparkConf)
val lines = sc.textFile(inputPath)
- val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP)
+ val points = lines.map(parsePoint).persist(StorageLevel.OFF_HEAP)
val ITERATIONS = args(1).toInt
// Initialize w to a random value
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index fbdfbf7e50..4891e4f4a1 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -35,6 +35,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -130,17 +131,15 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
JavaDStream<String> unifiedStream = stream1.union(stream2);
final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
- unifiedStream.foreachRDD(
- new Function<JavaRDD<String>, Void>() {
+ unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
- public Void call(JavaRDD<String> rdd) {
+ public void call(JavaRDD<String> rdd) {
result.addAll(rdd.collect());
for (OffsetRange o : offsetRanges.get()) {
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
);
}
- return null;
}
}
);
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 1e69de46cd..617c92a008 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -31,6 +31,7 @@ import org.junit.Test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -103,10 +104,9 @@ public class JavaKafkaStreamSuite implements Serializable {
}
);
- words.countByValue().foreachRDD(
- new Function<JavaPairRDD<String, Long>, Void>() {
+ words.countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
@Override
- public Void call(JavaPairRDD<String, Long> rdd) {
+ public void call(JavaPairRDD<String, Long> rdd) {
List<Tuple2<String, Long>> ret = rdd.collect();
for (Tuple2<String, Long> r : ret) {
if (result.containsKey(r._1())) {
@@ -115,8 +115,6 @@ public class JavaKafkaStreamSuite implements Serializable {
result.put(r._1(), r._2());
}
}
-
- return null;
}
}
);
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 6fe24fe811..78263f9dca 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -137,8 +137,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Verify that the generated KinesisBackedBlockRDD has the all the right information
val blockInfos = Seq(blockInfo1, blockInfo2)
val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
- nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
- val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+ nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
+ val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
assert(kinesisRDD.regionName === dummyRegionName)
assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
@@ -203,7 +203,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
Seconds(10), StorageLevel.MEMORY_ONLY, addFive,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
- stream shouldBe a [ReceiverInputDStream[Int]]
+ stream shouldBe a [ReceiverInputDStream[_]]
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.foreachRDD { rdd =>
@@ -272,7 +272,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
times.foreach { time =>
val (arrayOfSeqNumRanges, data) = collectedData(time)
val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
- rdd shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
+ rdd shouldBe a [KinesisBackedBlockRDD[_]]
// Verify the recovered sequence ranges
val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index e47c4db629..ca11ede4cc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Logging
-import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.annotation.Since
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.{axpy, scal}
import org.apache.spark.mllib.util.MLUtils
@@ -107,7 +107,7 @@ class KMeans private (
* Number of runs of the algorithm to execute in parallel.
*/
@Since("1.4.0")
- @deprecated("Support for runs is deprecated. This param will have no effect in 1.7.0.", "1.6.0")
+ @deprecated("Support for runs is deprecated. This param will have no effect in 2.0.0.", "1.6.0")
def getRuns: Int = runs
/**
@@ -117,7 +117,7 @@ class KMeans private (
* return the best clustering found over any run. Default: 1.
*/
@Since("0.8.0")
- @deprecated("Support for runs is deprecated. This param will have no effect in 1.7.0.", "1.6.0")
+ @deprecated("Support for runs is deprecated. This param will have no effect in 2.0.0.", "1.6.0")
def setRuns(runs: Int): this.type = {
if (runs <= 0) {
throw new IllegalArgumentException("Number of runs must be positive")
@@ -431,7 +431,7 @@ class KMeans private (
val rs = (0 until runs).filter { r =>
rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
}
- if (rs.length > 0) Some(p, rs) else None
+ if (rs.length > 0) Some((p, rs)) else None
}
}.collect()
mergeNewCenters()
diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
index 271dda4662..a6631ed7eb 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
@@ -56,10 +56,10 @@ public class JavaALSSuite implements Serializable {
double matchThreshold,
boolean implicitPrefs,
DoubleMatrix truePrefs) {
- List<Tuple2<Integer, Integer>> localUsersProducts = new ArrayList(users * products);
+ List<Tuple2<Integer, Integer>> localUsersProducts = new ArrayList<>(users * products);
for (int u=0; u < users; ++u) {
for (int p=0; p < products; ++p) {
- localUsersProducts.add(new Tuple2<Integer, Integer>(u, p));
+ localUsersProducts.add(new Tuple2<>(u, p));
}
}
JavaPairRDD<Integer, Integer> usersProducts = sc.parallelizePairs(localUsersProducts);
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
index 32c2f4f339..3db9b39e74 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
@@ -36,11 +36,11 @@ import org.apache.spark.api.java.JavaSparkContext;
public class JavaIsotonicRegressionSuite implements Serializable {
private transient JavaSparkContext sc;
- private List<Tuple3<Double, Double, Double>> generateIsotonicInput(double[] labels) {
- ArrayList<Tuple3<Double, Double, Double>> input = new ArrayList(labels.length);
+ private static List<Tuple3<Double, Double, Double>> generateIsotonicInput(double[] labels) {
+ List<Tuple3<Double, Double, Double>> input = new ArrayList<>(labels.length);
for (int i = 1; i <= labels.length; i++) {
- input.add(new Tuple3<Double, Double, Double>(labels[i-1], (double) i, 1d));
+ input.add(new Tuple3<>(labels[i-1], (double) i, 1.0));
}
return input;
@@ -70,7 +70,7 @@ public class JavaIsotonicRegressionSuite implements Serializable {
runIsotonicRegression(new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12});
Assert.assertArrayEquals(
- new double[] {1, 2, 7d/3, 7d/3, 6, 7, 8, 10, 10, 12}, model.predictions(), 1e-14);
+ new double[] {1, 2, 7.0/3, 7.0/3, 6, 7, 8, 10, 10, 12}, model.predictions(), 1.0e-14);
}
@Test
@@ -81,10 +81,10 @@ public class JavaIsotonicRegressionSuite implements Serializable {
JavaDoubleRDD testRDD = sc.parallelizeDoubles(Arrays.asList(0.0, 1.0, 9.5, 12.0, 13.0));
List<Double> predictions = model.predict(testRDD).collect();
- Assert.assertTrue(predictions.get(0) == 1d);
- Assert.assertTrue(predictions.get(1) == 1d);
- Assert.assertTrue(predictions.get(2) == 10d);
- Assert.assertTrue(predictions.get(3) == 12d);
- Assert.assertTrue(predictions.get(4) == 12d);
+ Assert.assertEquals(1.0, predictions.get(0).doubleValue(), 1.0e-14);
+ Assert.assertEquals(1.0, predictions.get(1).doubleValue(), 1.0e-14);
+ Assert.assertEquals(10.0, predictions.get(2).doubleValue(), 1.0e-14);
+ Assert.assertEquals(12.0, predictions.get(3).doubleValue(), 1.0e-14);
+ Assert.assertEquals(12.0, predictions.get(4).doubleValue(), 1.0e-14);
}
}
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 48daa87e82..d22a7f4c3b 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -173,7 +173,7 @@ class KMeans(object):
"""Train a k-means clustering model."""
if runs != 1:
warnings.warn(
- "Support for runs is deprecated in 1.6.0. This param will have no effect in 1.7.0.")
+ "Support for runs is deprecated in 1.6.0. This param will have no effect in 2.0.0.")
clusterInitialModel = []
if initialModel is not None:
if not isinstance(initialModel, KMeansModel):
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index f869a96edb..e028d22a54 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -57,8 +57,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
(result, expected) match {
case (result: Array[Byte], expected: Array[Byte]) =>
java.util.Arrays.equals(result, expected)
- case (result: Double, expected: Spread[Double]) =>
- expected.isWithin(result)
+ case (result: Double, expected: Spread[Double @unchecked]) =>
+ expected.asInstanceOf[Spread[Double]].isWithin(result)
case _ => result == expected
}
}
@@ -275,8 +275,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
(result, expected) match {
case (result: Array[Byte], expected: Array[Byte]) =>
java.util.Arrays.equals(result, expected)
- case (result: Double, expected: Spread[Double]) =>
- expected.isWithin(result)
+ case (result: Double, expected: Spread[Double @unchecked]) =>
+ expected.asInstanceOf[Spread[Double]].isWithin(result)
case (result: Double, expected: Double) if result.isNaN && expected.isNaN =>
true
case (result: Float, expected: Float) if result.isNaN && expected.isNaN =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index d5f1c4d74e..6745b4b6c3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -384,9 +384,6 @@ class DateTimeUtilsSuite extends SparkFunSuite {
Timestamp.valueOf("1700-02-28 12:14:50.123456")).foreach { t =>
val us = fromJavaTimestamp(t)
assert(toJavaTimestamp(us) === t)
- assert(getHours(us) === t.getHours)
- assert(getMinutes(us) === t.getMinutes)
- assert(getSeconds(us) === t.getSeconds)
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index f8e32d60a4..6bcd155ccd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasources.parquet;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -62,7 +63,7 @@ import org.apache.parquet.schema.Types;
import org.apache.spark.sql.types.StructType;
/**
- * Base class for custom RecordReaaders for Parquet that directly materialize to `T`.
+ * Base class for custom RecordReaders for Parquet that directly materialize to `T`.
* This class handles computing row groups, filtering on them, setting up the column readers,
* etc.
* This is heavily based on parquet-mr's RecordReader.
@@ -83,6 +84,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
protected ParquetFileReader reader;
+ @Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
@@ -131,8 +133,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
}
this.fileSchema = footer.getFileMetaData().getSchema();
Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
- ReadSupport<T> readSupport = getReadSupportInstance(
- (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration));
+ ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema();
@@ -282,8 +283,9 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
return Collections.unmodifiableMap(setMultiMap);
}
- private static Class<?> getReadSupportClass(Configuration configuration) {
- return ConfigurationUtil.getClassFromConfig(configuration,
+ @SuppressWarnings("unchecked")
+ private Class<? extends ReadSupport<T>> getReadSupportClass(Configuration configuration) {
+ return (Class<? extends ReadSupport<T>>) ConfigurationUtil.getClassFromConfig(configuration,
ParquetInputFormat.READ_SUPPORT_CLASS, ReadSupport.class);
}
@@ -294,10 +296,9 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
private static <T> ReadSupport<T> getReadSupportInstance(
Class<? extends ReadSupport<T>> readSupportClass){
try {
- return readSupportClass.newInstance();
- } catch (InstantiationException e) {
- throw new BadConfigurationException("could not instantiate read support class", e);
- } catch (IllegalAccessException e) {
+ return readSupportClass.getConstructor().newInstance();
+ } catch (InstantiationException | IllegalAccessException |
+ NoSuchMethodException | InvocationTargetException e) {
throw new BadConfigurationException("could not instantiate read support class", e);
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 076db0c08d..eb4efcd1d4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -580,7 +580,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
)
}
- test("sparkPartitionId") {
+ test("spark_partition_id") {
// Make sure we have 2 partitions, each with 2 records.
val df = sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
Iterator(Tuple1(1), Tuple1(2))
@@ -591,7 +591,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
)
}
- test("InputFileName") {
+ test("input_file_name") {
withTempPath { dir =>
val data = sparkContext.parallelize(0 to 10).toDF("id")
data.write.parquet(dir.getCanonicalPath)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 0e60573dc6..fac26bd0c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import java.util.{Locale, TimeZone}
import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
@@ -206,7 +207,7 @@ abstract class QueryTest extends PlanTest {
val jsonString = try {
logicalPlan.toJSON
} catch {
- case e =>
+ case NonFatal(e) =>
fail(
s"""
|Failed to parse logical plan to JSON:
@@ -231,7 +232,7 @@ abstract class QueryTest extends PlanTest {
val jsonBackPlan = try {
TreeNode.fromJSON[LogicalPlan](jsonString, sqlContext.sparkContext)
} catch {
- case e =>
+ case NonFatal(e) =>
fail(
s"""
|Failed to rebuild the logical plan from JSON:
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
index 97cba1e349..1529313dfb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
@@ -60,6 +60,7 @@ object ColumnarTestUtils {
case MAP(_) =>
ArrayBasedMapData(
Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32)))))
+ case _ => throw new IllegalArgumentException(s"Unknown column type $columnType")
}).asInstanceOf[JvmType]
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 9722c60bba..ddc56fc869 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -772,8 +772,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
public void testForeachRDD() {
- final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
- final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0);
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,1,1),
Arrays.asList(1,1,1));
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index bc4bc2eb42..20e2a1c3d5 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -18,6 +18,7 @@
package org.apache.spark.streaming;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -26,10 +27,10 @@ import java.util.Set;
import scala.Tuple2;
import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.util.ManualClock;
import org.junit.Assert;
@@ -51,10 +52,8 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
JavaPairRDD<String, Boolean> initialRDD = null;
JavaPairDStream<String, Integer> wordsDstream = null;
- final Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>
- mappingFunc =
+ Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc =
new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() {
-
@Override
public Optional<Double> call(
Time time, String word, Optional<Integer> one, State<Boolean> state) {
@@ -76,11 +75,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
.partitioner(new HashPartitioner(10))
.timeout(Durations.seconds(10)));
- JavaPairDStream<String, Boolean> stateSnapshots = stateDstream.stateSnapshots();
+ stateDstream.stateSnapshots();
- final Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
+ Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
new Function3<String, Optional<Integer>, State<Boolean>, Double>() {
-
@Override
public Double call(String key, Optional<Integer> one, State<Boolean> state) {
// Use all State's methods here
@@ -95,13 +93,13 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function(mappingFunc2)
+ StateSpec.function(mappingFunc2)
.initialState(initialRDD)
.numPartitions(10)
.partitioner(new HashPartitioner(10))
.timeout(Durations.seconds(10)));
- JavaPairDStream<String, Boolean> stateSnapshots2 = stateDstream2.stateSnapshots();
+ stateDstream2.stateSnapshots();
}
@Test
@@ -126,33 +124,21 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
Collections.<Integer>emptySet()
);
+ @SuppressWarnings("unchecked")
List<Set<Tuple2<String, Integer>>> stateData = Arrays.asList(
Collections.<Tuple2<String, Integer>>emptySet(),
- Sets.newHashSet(new Tuple2<String, Integer>("a", 1)),
- Sets.newHashSet(new Tuple2<String, Integer>("a", 2), new Tuple2<String, Integer>("b", 1)),
- Sets.newHashSet(
- new Tuple2<String, Integer>("a", 3),
- new Tuple2<String, Integer>("b", 2),
- new Tuple2<String, Integer>("c", 1)),
- Sets.newHashSet(
- new Tuple2<String, Integer>("a", 4),
- new Tuple2<String, Integer>("b", 3),
- new Tuple2<String, Integer>("c", 1)),
- Sets.newHashSet(
- new Tuple2<String, Integer>("a", 5),
- new Tuple2<String, Integer>("b", 3),
- new Tuple2<String, Integer>("c", 1)),
- Sets.newHashSet(
- new Tuple2<String, Integer>("a", 5),
- new Tuple2<String, Integer>("b", 3),
- new Tuple2<String, Integer>("c", 1))
+ Sets.newHashSet(new Tuple2<>("a", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 2), new Tuple2<>("b", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 3), new Tuple2<>("b", 2), new Tuple2<>("c", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 4), new Tuple2<>("b", 3), new Tuple2<>("c", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new Tuple2<>("c", 1)),
+ Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new Tuple2<>("c", 1))
);
Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
-
@Override
- public Integer call(String key, Optional<Integer> value, State<Integer> state) throws Exception {
+ public Integer call(String key, Optional<Integer> value, State<Integer> state) {
int sum = value.or(0) + (state.exists() ? state.get() : 0);
state.update(sum);
return sum;
@@ -160,7 +146,7 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
};
testOperation(
inputData,
- StateSpec.<String, Integer, Integer, Integer>function(mappingFunc),
+ StateSpec.function(mappingFunc),
outputData,
stateData);
}
@@ -175,27 +161,25 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() {
@Override
- public Tuple2<K, Integer> call(K x) throws Exception {
- return new Tuple2<K, Integer>(x, 1);
+ public Tuple2<K, Integer> call(K x) {
+ return new Tuple2<>(x, 1);
}
})).mapWithState(mapWithStateSpec);
final List<Set<T>> collectedOutputs =
- Collections.synchronizedList(Lists.<Set<T>>newArrayList());
- mapWithStateDStream.foreachRDD(new Function<JavaRDD<T>, Void>() {
+ Collections.synchronizedList(new ArrayList<Set<T>>());
+ mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() {
@Override
- public Void call(JavaRDD<T> rdd) throws Exception {
+ public void call(JavaRDD<T> rdd) {
collectedOutputs.add(Sets.newHashSet(rdd.collect()));
- return null;
}
});
final List<Set<Tuple2<K, S>>> collectedStateSnapshots =
- Collections.synchronizedList(Lists.<Set<Tuple2<K, S>>>newArrayList());
- mapWithStateDStream.stateSnapshots().foreachRDD(new Function<JavaPairRDD<K, S>, Void>() {
+ Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>());
+ mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() {
@Override
- public Void call(JavaPairRDD<K, S> rdd) throws Exception {
+ public void call(JavaPairRDD<K, S> rdd) {
collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()));
- return null;
}
});
BatchCounter batchCounter = new BatchCounter(ssc.ssc());
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 7a8ef9d147..d09258e0e4 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -18,13 +18,14 @@
package org.apache.spark.streaming;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import static org.junit.Assert.*;
import com.google.common.io.Closeables;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -68,12 +69,11 @@ public class JavaReceiverAPISuite implements Serializable {
return v1 + ".";
}
});
- mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
+ mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
- public Void call(JavaRDD<String> rdd) {
+ public void call(JavaRDD<String> rdd) {
long count = rdd.count();
dataCounter.addAndGet(count);
- return null;
}
});
@@ -90,7 +90,7 @@ public class JavaReceiverAPISuite implements Serializable {
Thread.sleep(100);
}
ssc.stop();
- assertTrue(dataCounter.get() > 0);
+ Assert.assertTrue(dataCounter.get() > 0);
} finally {
server.stop();
}
@@ -98,8 +98,8 @@ public class JavaReceiverAPISuite implements Serializable {
private static class JavaSocketReceiver extends Receiver<String> {
- String host = null;
- int port = -1;
+ private String host = null;
+ private int port = -1;
JavaSocketReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK());