From b9c835337880f57fe8b953962913bcc524162348 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 8 Jan 2016 17:47:44 +0000 Subject: [SPARK-12618][CORE][STREAMING][SQL] Clean up build warnings: 2.0.0 edition Fix most build warnings: mostly deprecated API usages. I'll annotate some of the changes below. CC rxin who is leading the charge to remove the deprecated APIs. Author: Sean Owen Closes #10570 from srowen/SPARK-12618. --- core/src/test/scala/org/apache/spark/Smuggle.scala | 1 + .../JavaBinaryClassificationMetricsExample.java | 5 +- .../examples/mllib/JavaRankingMetricsExample.java | 21 +++++-- .../streaming/JavaRecoverableNetworkWordCount.java | 8 +-- .../streaming/JavaSqlNetworkWordCount.java | 8 +-- .../JavaTwitterHashTagJoinSentiments.java | 36 +++++------- .../org/apache/spark/examples/SparkHdfsLR.scala | 2 +- .../apache/spark/examples/SparkTachyonHdfsLR.scala | 2 +- .../kafka/JavaDirectKafkaStreamSuite.java | 7 +-- .../streaming/kafka/JavaKafkaStreamSuite.java | 8 +-- .../streaming/kinesis/KinesisStreamSuite.scala | 8 +-- .../org/apache/spark/mllib/clustering/KMeans.scala | 8 +-- .../spark/mllib/recommendation/JavaALSSuite.java | 4 +- .../regression/JavaIsotonicRegressionSuite.java | 18 +++--- python/pyspark/mllib/clustering.py | 2 +- .../expressions/ExpressionEvalHelper.scala | 8 +-- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 3 - .../parquet/SpecificParquetRecordReaderBase.java | 19 ++++--- .../apache/spark/sql/ColumnExpressionSuite.scala | 4 +- .../scala/org/apache/spark/sql/QueryTest.scala | 5 +- .../sql/execution/columnar/ColumnarTestUtils.scala | 1 + .../org/apache/spark/streaming/JavaAPISuite.java | 4 +- .../spark/streaming/JavaMapWithStateSuite.java | 64 ++++++++-------------- .../spark/streaming/JavaReceiverAPISuite.java | 14 ++--- 24 files changed, 123 insertions(+), 137 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/Smuggle.scala b/core/src/test/scala/org/apache/spark/Smuggle.scala index 01694a6e6f..9f0a1b4c25 100644 --- a/core/src/test/scala/org/apache/spark/Smuggle.scala +++ b/core/src/test/scala/org/apache/spark/Smuggle.scala @@ -21,6 +21,7 @@ import java.util.UUID import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.mutable +import scala.language.implicitConversions /** * Utility wrapper to "smuggle" objects into tasks while bypassing serialization. diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java index 779fac01c4..3d8babba04 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java @@ -56,6 +56,7 @@ public class JavaBinaryClassificationMetricsExample { // Compute raw scores on the test set. JavaRDD> predictionAndLabels = test.map( new Function>() { + @Override public Tuple2 call(LabeledPoint p) { Double prediction = model.predict(p.features()); return new Tuple2(prediction, p.label()); @@ -88,6 +89,7 @@ public class JavaBinaryClassificationMetricsExample { // Thresholds JavaRDD thresholds = precision.map( new Function, Double>() { + @Override public Double call(Tuple2 t) { return new Double(t._1().toString()); } @@ -106,8 +108,7 @@ public class JavaBinaryClassificationMetricsExample { // Save and load model model.save(sc, "target/tmp/LogisticRegressionModel"); - LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc, - "target/tmp/LogisticRegressionModel"); + LogisticRegressionModel.load(sc, "target/tmp/LogisticRegressionModel"); // $example off$ } } diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java index 47ab3fc358..4ad2104763 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java @@ -41,6 +41,7 @@ public class JavaRankingMetricsExample { JavaRDD data = sc.textFile(path); JavaRDD ratings = data.map( new Function() { + @Override public Rating call(String line) { String[] parts = line.split("::"); return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double @@ -57,13 +58,14 @@ public class JavaRankingMetricsExample { JavaRDD> userRecs = model.recommendProductsForUsers(10).toJavaRDD(); JavaRDD> userRecsScaled = userRecs.map( new Function, Tuple2>() { + @Override public Tuple2 call(Tuple2 t) { Rating[] scaledRatings = new Rating[t._2().length]; for (int i = 0; i < scaledRatings.length; i++) { double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0); scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating); } - return new Tuple2(t._1(), scaledRatings); + return new Tuple2<>(t._1(), scaledRatings); } } ); @@ -72,6 +74,7 @@ public class JavaRankingMetricsExample { // Map ratings to 1 or 0, 1 indicating a movie that should be recommended JavaRDD binarizedRatings = ratings.map( new Function() { + @Override public Rating call(Rating r) { double binaryRating; if (r.rating() > 0.0) { @@ -87,6 +90,7 @@ public class JavaRankingMetricsExample { // Group ratings by common user JavaPairRDD> userMovies = binarizedRatings.groupBy( new Function() { + @Override public Object call(Rating r) { return r.user(); } @@ -96,8 +100,9 @@ public class JavaRankingMetricsExample { // Get true relevant documents from all user ratings JavaPairRDD> userMoviesList = userMovies.mapValues( new Function, List>() { + @Override public List call(Iterable docs) { - List products = new ArrayList(); + List products = new ArrayList<>(); for (Rating r : docs) { if (r.rating() > 0.0) { products.add(r.product()); @@ -111,8 +116,9 @@ public class JavaRankingMetricsExample { // Extract the product id from each recommendation JavaPairRDD> userRecommendedList = userRecommended.mapValues( new Function>() { + @Override public List call(Rating[] docs) { - List products = new ArrayList(); + List products = new ArrayList<>(); for (Rating r : docs) { products.add(r.product()); } @@ -124,7 +130,7 @@ public class JavaRankingMetricsExample { userRecommendedList).values(); // Instantiate the metrics object - RankingMetrics metrics = RankingMetrics.of(relevantDocs); + RankingMetrics metrics = RankingMetrics.of(relevantDocs); // Precision and NDCG at k Integer[] kVector = {1, 3, 5}; @@ -139,6 +145,7 @@ public class JavaRankingMetricsExample { // Evaluate the model using numerical ratings and regression metrics JavaRDD> userProducts = ratings.map( new Function>() { + @Override public Tuple2 call(Rating r) { return new Tuple2(r.user(), r.product()); } @@ -147,18 +154,20 @@ public class JavaRankingMetricsExample { JavaPairRDD, Object> predictions = JavaPairRDD.fromJavaRDD( model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( new Function, Object>>() { + @Override public Tuple2, Object> call(Rating r) { return new Tuple2, Object>( - new Tuple2(r.user(), r.product()), r.rating()); + new Tuple2<>(r.user(), r.product()), r.rating()); } } )); JavaRDD> ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map( new Function, Object>>() { + @Override public Tuple2, Object> call(Rating r) { return new Tuple2, Object>( - new Tuple2(r.user(), r.product()), r.rating()); + new Tuple2<>(r.user(), r.product()), r.rating()); } } )).join(predictions).values(); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index 90d473703e..bc963a02be 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -36,6 +36,7 @@ import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; @@ -154,9 +155,9 @@ public final class JavaRecoverableNetworkWordCount { } }); - wordCounts.foreachRDD(new Function2, Time, Void>() { + wordCounts.foreachRDD(new VoidFunction2, Time>() { @Override - public Void call(JavaPairRDD rdd, Time time) throws IOException { + public void call(JavaPairRDD rdd, Time time) throws IOException { // Get or register the blacklist Broadcast final Broadcast> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); // Get or register the droppedWordsCounter Accumulator @@ -164,7 +165,7 @@ public final class JavaRecoverableNetworkWordCount { // Use blacklist to drop words and use droppedWordsCounter to count them String counts = rdd.filter(new Function, Boolean>() { @Override - public Boolean call(Tuple2 wordCount) throws Exception { + public Boolean call(Tuple2 wordCount) { if (blacklist.value().contains(wordCount._1())) { droppedWordsCounter.add(wordCount._2()); return false; @@ -178,7 +179,6 @@ public final class JavaRecoverableNetworkWordCount { System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); System.out.println("Appending to " + outputFile.getAbsolutePath()); Files.append(output + "\n", outputFile, Charset.defaultCharset()); - return null; } }); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java index 3515d7be45..084f68a8be 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java @@ -26,7 +26,7 @@ import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.api.java.StorageLevels; @@ -78,13 +78,14 @@ public final class JavaSqlNetworkWordCount { }); // Convert RDDs of the words DStream to DataFrame and run SQL query - words.foreachRDD(new Function2, Time, Void>() { + words.foreachRDD(new VoidFunction2, Time>() { @Override - public Void call(JavaRDD rdd, Time time) { + public void call(JavaRDD rdd, Time time) { SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context()); // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame JavaRDD rowRDD = rdd.map(new Function() { + @Override public JavaRecord call(String word) { JavaRecord record = new JavaRecord(); record.setWord(word); @@ -101,7 +102,6 @@ public final class JavaSqlNetworkWordCount { sqlContext.sql("select word, count(*) as total from words group by word"); System.out.println("========= " + time + "========="); wordCountsDataFrame.show(); - return null; } }); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java index 030ee30b93..d869768026 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java @@ -17,13 +17,13 @@ package org.apache.spark.examples.streaming; -import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -33,8 +33,6 @@ import org.apache.spark.streaming.twitter.TwitterUtils; import scala.Tuple2; import twitter4j.Status; -import java.io.IOException; -import java.net.URI; import java.util.Arrays; import java.util.List; @@ -44,7 +42,7 @@ import java.util.List; */ public class JavaTwitterHashTagJoinSentiments { - public static void main(String[] args) throws IOException { + public static void main(String[] args) { if (args.length < 4) { System.err.println("Usage: JavaTwitterHashTagJoinSentiments " + " []"); @@ -79,7 +77,7 @@ public class JavaTwitterHashTagJoinSentiments { JavaDStream hashTags = words.filter(new Function() { @Override - public Boolean call(String word) throws Exception { + public Boolean call(String word) { return word.startsWith("#"); } }); @@ -91,8 +89,7 @@ public class JavaTwitterHashTagJoinSentiments { @Override public Tuple2 call(String line) { String[] columns = line.split("\t"); - return new Tuple2(columns[0], - Double.parseDouble(columns[1])); + return new Tuple2<>(columns[0], Double.parseDouble(columns[1])); } }); @@ -101,7 +98,7 @@ public class JavaTwitterHashTagJoinSentiments { @Override public Tuple2 call(String s) { // leave out the # character - return new Tuple2(s.substring(1), 1); + return new Tuple2<>(s.substring(1), 1); } }); @@ -120,9 +117,8 @@ public class JavaTwitterHashTagJoinSentiments { hashTagTotals.transformToPair(new Function, JavaPairRDD>>() { @Override - public JavaPairRDD> call(JavaPairRDD topicCount) - throws Exception { + public JavaPairRDD> call( + JavaPairRDD topicCount) { return wordSentiments.join(topicCount); } }); @@ -131,9 +127,9 @@ public class JavaTwitterHashTagJoinSentiments { new PairFunction>, String, Double>() { @Override public Tuple2 call(Tuple2> topicAndTuplePair) throws Exception { + Tuple2> topicAndTuplePair) { Tuple2 happinessAndCount = topicAndTuplePair._2(); - return new Tuple2(topicAndTuplePair._1(), + return new Tuple2<>(topicAndTuplePair._1(), happinessAndCount._1() * happinessAndCount._2()); } }); @@ -141,9 +137,8 @@ public class JavaTwitterHashTagJoinSentiments { JavaPairDStream happinessTopicPairs = topicHappiness.mapToPair( new PairFunction, Double, String>() { @Override - public Tuple2 call(Tuple2 topicHappiness) - throws Exception { - return new Tuple2(topicHappiness._2(), + public Tuple2 call(Tuple2 topicHappiness) { + return new Tuple2<>(topicHappiness._2(), topicHappiness._1()); } }); @@ -151,17 +146,17 @@ public class JavaTwitterHashTagJoinSentiments { JavaPairDStream happiest10 = happinessTopicPairs.transformToPair( new Function, JavaPairRDD>() { @Override - public JavaPairRDD call(JavaPairRDD happinessAndTopics) throws Exception { + public JavaPairRDD call( + JavaPairRDD happinessAndTopics) { return happinessAndTopics.sortByKey(false); } } ); // Print hash tags with the most positive sentiment values - happiest10.foreachRDD(new Function, Void>() { + happiest10.foreachRDD(new VoidFunction>() { @Override - public Void call(JavaPairRDD happinessTopicPairs) throws Exception { + public void call(JavaPairRDD happinessTopicPairs) { List> topList = happinessTopicPairs.take(10); System.out.println( String.format("\nHappiest topics in last 10 seconds (%s total):", @@ -170,7 +165,6 @@ public class JavaTwitterHashTagJoinSentiments { System.out.println( String.format("%s (%s happiness)", pair._2(), pair._1())); } - return null; } }); diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 04dec57b71..e4486b949f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -74,7 +74,7 @@ object SparkHdfsLR { val conf = new Configuration() val sc = new SparkContext(sparkConf) val lines = sc.textFile(inputPath) - val points = lines.map(parsePoint _).cache() + val points = lines.map(parsePoint).cache() val ITERATIONS = args(1).toInt // Initialize w to a random value diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala index ddc99d3f90..8b739c9d7c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -71,7 +71,7 @@ object SparkTachyonHdfsLR { val conf = new Configuration() val sc = new SparkContext(sparkConf) val lines = sc.textFile(inputPath) - val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP) + val points = lines.map(parsePoint).persist(StorageLevel.OFF_HEAP) val ITERATIONS = args(1).toInt // Initialize w to a random value diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index fbdfbf7e50..4891e4f4a1 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -35,6 +35,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -130,17 +131,15 @@ public class JavaDirectKafkaStreamSuite implements Serializable { JavaDStream unifiedStream = stream1.union(stream2); final Set result = Collections.synchronizedSet(new HashSet()); - unifiedStream.foreachRDD( - new Function, Void>() { + unifiedStream.foreachRDD(new VoidFunction>() { @Override - public Void call(JavaRDD rdd) { + public void call(JavaRDD rdd) { result.addAll(rdd.collect()); for (OffsetRange o : offsetRanges.get()) { System.out.println( o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() ); } - return null; } } ); diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 1e69de46cd..617c92a008 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -31,6 +31,7 @@ import org.junit.Test; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; @@ -103,10 +104,9 @@ public class JavaKafkaStreamSuite implements Serializable { } ); - words.countByValue().foreachRDD( - new Function, Void>() { + words.countByValue().foreachRDD(new VoidFunction>() { @Override - public Void call(JavaPairRDD rdd) { + public void call(JavaPairRDD rdd) { List> ret = rdd.collect(); for (Tuple2 r : ret) { if (result.containsKey(r._1())) { @@ -115,8 +115,6 @@ public class JavaKafkaStreamSuite implements Serializable { result.put(r._1(), r._2()); } } - - return null; } } ); diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 6fe24fe811..78263f9dca 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -137,8 +137,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Verify that the generated KinesisBackedBlockRDD has the all the right information val blockInfos = Seq(blockInfo1, blockInfo2) val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos) - nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]] - val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] + nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]] + val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]] assert(kinesisRDD.regionName === dummyRegionName) assert(kinesisRDD.endpointUrl === dummyEndpointUrl) assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds) @@ -203,7 +203,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun Seconds(10), StorageLevel.MEMORY_ONLY, addFive, awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - stream shouldBe a [ReceiverInputDStream[Int]] + stream shouldBe a [ReceiverInputDStream[_]] val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] stream.foreachRDD { rdd => @@ -272,7 +272,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun times.foreach { time => val (arrayOfSeqNumRanges, data) = collectedData(time) val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] - rdd shouldBe a [KinesisBackedBlockRDD[Array[Byte]]] + rdd shouldBe a [KinesisBackedBlockRDD[_]] // Verify the recovered sequence ranges val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index e47c4db629..ca11ede4cc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging -import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.annotation.Since import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.{axpy, scal} import org.apache.spark.mllib.util.MLUtils @@ -107,7 +107,7 @@ class KMeans private ( * Number of runs of the algorithm to execute in parallel. */ @Since("1.4.0") - @deprecated("Support for runs is deprecated. This param will have no effect in 1.7.0.", "1.6.0") + @deprecated("Support for runs is deprecated. This param will have no effect in 2.0.0.", "1.6.0") def getRuns: Int = runs /** @@ -117,7 +117,7 @@ class KMeans private ( * return the best clustering found over any run. Default: 1. */ @Since("0.8.0") - @deprecated("Support for runs is deprecated. This param will have no effect in 1.7.0.", "1.6.0") + @deprecated("Support for runs is deprecated. This param will have no effect in 2.0.0.", "1.6.0") def setRuns(runs: Int): this.type = { if (runs <= 0) { throw new IllegalArgumentException("Number of runs must be positive") @@ -431,7 +431,7 @@ class KMeans private ( val rs = (0 until runs).filter { r => rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r) } - if (rs.length > 0) Some(p, rs) else None + if (rs.length > 0) Some((p, rs)) else None } }.collect() mergeNewCenters() diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index 271dda4662..a6631ed7eb 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -56,10 +56,10 @@ public class JavaALSSuite implements Serializable { double matchThreshold, boolean implicitPrefs, DoubleMatrix truePrefs) { - List> localUsersProducts = new ArrayList(users * products); + List> localUsersProducts = new ArrayList<>(users * products); for (int u=0; u < users; ++u) { for (int p=0; p < products; ++p) { - localUsersProducts.add(new Tuple2(u, p)); + localUsersProducts.add(new Tuple2<>(u, p)); } } JavaPairRDD usersProducts = sc.parallelizePairs(localUsersProducts); diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java index 32c2f4f339..3db9b39e74 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -36,11 +36,11 @@ import org.apache.spark.api.java.JavaSparkContext; public class JavaIsotonicRegressionSuite implements Serializable { private transient JavaSparkContext sc; - private List> generateIsotonicInput(double[] labels) { - ArrayList> input = new ArrayList(labels.length); + private static List> generateIsotonicInput(double[] labels) { + List> input = new ArrayList<>(labels.length); for (int i = 1; i <= labels.length; i++) { - input.add(new Tuple3(labels[i-1], (double) i, 1d)); + input.add(new Tuple3<>(labels[i-1], (double) i, 1.0)); } return input; @@ -70,7 +70,7 @@ public class JavaIsotonicRegressionSuite implements Serializable { runIsotonicRegression(new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12}); Assert.assertArrayEquals( - new double[] {1, 2, 7d/3, 7d/3, 6, 7, 8, 10, 10, 12}, model.predictions(), 1e-14); + new double[] {1, 2, 7.0/3, 7.0/3, 6, 7, 8, 10, 10, 12}, model.predictions(), 1.0e-14); } @Test @@ -81,10 +81,10 @@ public class JavaIsotonicRegressionSuite implements Serializable { JavaDoubleRDD testRDD = sc.parallelizeDoubles(Arrays.asList(0.0, 1.0, 9.5, 12.0, 13.0)); List predictions = model.predict(testRDD).collect(); - Assert.assertTrue(predictions.get(0) == 1d); - Assert.assertTrue(predictions.get(1) == 1d); - Assert.assertTrue(predictions.get(2) == 10d); - Assert.assertTrue(predictions.get(3) == 12d); - Assert.assertTrue(predictions.get(4) == 12d); + Assert.assertEquals(1.0, predictions.get(0).doubleValue(), 1.0e-14); + Assert.assertEquals(1.0, predictions.get(1).doubleValue(), 1.0e-14); + Assert.assertEquals(10.0, predictions.get(2).doubleValue(), 1.0e-14); + Assert.assertEquals(12.0, predictions.get(3).doubleValue(), 1.0e-14); + Assert.assertEquals(12.0, predictions.get(4).doubleValue(), 1.0e-14); } } diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 48daa87e82..d22a7f4c3b 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -173,7 +173,7 @@ class KMeans(object): """Train a k-means clustering model.""" if runs != 1: warnings.warn( - "Support for runs is deprecated in 1.6.0. This param will have no effect in 1.7.0.") + "Support for runs is deprecated in 1.6.0. This param will have no effect in 2.0.0.") clusterInitialModel = [] if initialModel is not None: if not isinstance(initialModel, KMeansModel): diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index f869a96edb..e028d22a54 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -57,8 +57,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { (result, expected) match { case (result: Array[Byte], expected: Array[Byte]) => java.util.Arrays.equals(result, expected) - case (result: Double, expected: Spread[Double]) => - expected.isWithin(result) + case (result: Double, expected: Spread[Double @unchecked]) => + expected.asInstanceOf[Spread[Double]].isWithin(result) case _ => result == expected } } @@ -275,8 +275,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { (result, expected) match { case (result: Array[Byte], expected: Array[Byte]) => java.util.Arrays.equals(result, expected) - case (result: Double, expected: Spread[Double]) => - expected.isWithin(result) + case (result: Double, expected: Spread[Double @unchecked]) => + expected.asInstanceOf[Spread[Double]].isWithin(result) case (result: Double, expected: Double) if result.isNaN && expected.isNaN => true case (result: Float, expected: Float) if result.isNaN && expected.isNaN => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index d5f1c4d74e..6745b4b6c3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -384,9 +384,6 @@ class DateTimeUtilsSuite extends SparkFunSuite { Timestamp.valueOf("1700-02-28 12:14:50.123456")).foreach { t => val us = fromJavaTimestamp(t) assert(toJavaTimestamp(us) === t) - assert(getHours(us) === t.getHours) - assert(getMinutes(us) === t.getMinutes) - assert(getSeconds(us) === t.getSeconds) } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index f8e32d60a4..6bcd155ccd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -62,7 +63,7 @@ import org.apache.parquet.schema.Types; import org.apache.spark.sql.types.StructType; /** - * Base class for custom RecordReaaders for Parquet that directly materialize to `T`. + * Base class for custom RecordReaders for Parquet that directly materialize to `T`. * This class handles computing row groups, filtering on them, setting up the column readers, * etc. * This is heavily based on parquet-mr's RecordReader. @@ -83,6 +84,7 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader extends RecordReader fileMetadata = footer.getFileMetaData().getKeyValueMetaData(); - ReadSupport readSupport = getReadSupportInstance( - (Class>) getReadSupportClass(configuration)); + ReadSupport readSupport = getReadSupportInstance(getReadSupportClass(configuration)); ReadSupport.ReadContext readContext = readSupport.init(new InitContext( taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema)); this.requestedSchema = readContext.getRequestedSchema(); @@ -282,8 +283,9 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader getReadSupportClass(Configuration configuration) { - return ConfigurationUtil.getClassFromConfig(configuration, + @SuppressWarnings("unchecked") + private Class> getReadSupportClass(Configuration configuration) { + return (Class>) ConfigurationUtil.getClassFromConfig(configuration, ParquetInputFormat.READ_SUPPORT_CLASS, ReadSupport.class); } @@ -294,10 +296,9 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader ReadSupport getReadSupportInstance( Class> readSupportClass){ try { - return readSupportClass.newInstance(); - } catch (InstantiationException e) { - throw new BadConfigurationException("could not instantiate read support class", e); - } catch (IllegalAccessException e) { + return readSupportClass.getConstructor().newInstance(); + } catch (InstantiationException | IllegalAccessException | + NoSuchMethodException | InvocationTargetException e) { throw new BadConfigurationException("could not instantiate read support class", e); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 076db0c08d..eb4efcd1d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -580,7 +580,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("sparkPartitionId") { + test("spark_partition_id") { // Make sure we have 2 partitions, each with 2 records. val df = sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ => Iterator(Tuple1(1), Tuple1(2)) @@ -591,7 +591,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("InputFileName") { + test("input_file_name") { withTempPath { dir => val data = sparkContext.parallelize(0 to 10).toDF("id") data.write.parquet(dir.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 0e60573dc6..fac26bd0c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.util.{Locale, TimeZone} import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate @@ -206,7 +207,7 @@ abstract class QueryTest extends PlanTest { val jsonString = try { logicalPlan.toJSON } catch { - case e => + case NonFatal(e) => fail( s""" |Failed to parse logical plan to JSON: @@ -231,7 +232,7 @@ abstract class QueryTest extends PlanTest { val jsonBackPlan = try { TreeNode.fromJSON[LogicalPlan](jsonString, sqlContext.sparkContext) } catch { - case e => + case NonFatal(e) => fail( s""" |Failed to rebuild the logical plan from JSON: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala index 97cba1e349..1529313dfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala @@ -60,6 +60,7 @@ object ColumnarTestUtils { case MAP(_) => ArrayBasedMapData( Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32))))) + case _ => throw new IllegalArgumentException(s"Unknown column type $columnType") }).asInstanceOf[JvmType] } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 9722c60bba..ddc56fc869 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -772,8 +772,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @SuppressWarnings("unchecked") @Test public void testForeachRDD() { - final Accumulator accumRdd = ssc.sc().accumulator(0); - final Accumulator accumEle = ssc.sc().accumulator(0); + final Accumulator accumRdd = ssc.sparkContext().accumulator(0); + final Accumulator accumEle = ssc.sparkContext().accumulator(0); List> inputData = Arrays.asList( Arrays.asList(1,1,1), Arrays.asList(1,1,1)); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java index bc4bc2eb42..20e2a1c3d5 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java @@ -18,6 +18,7 @@ package org.apache.spark.streaming; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -26,10 +27,10 @@ import java.util.Set; import scala.Tuple2; import com.google.common.base.Optional; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.util.ManualClock; import org.junit.Assert; @@ -51,10 +52,8 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaPairRDD initialRDD = null; JavaPairDStream wordsDstream = null; - final Function4, State, Optional> - mappingFunc = + Function4, State, Optional> mappingFunc = new Function4, State, Optional>() { - @Override public Optional call( Time time, String word, Optional one, State state) { @@ -76,11 +75,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements .partitioner(new HashPartitioner(10)) .timeout(Durations.seconds(10))); - JavaPairDStream stateSnapshots = stateDstream.stateSnapshots(); + stateDstream.stateSnapshots(); - final Function3, State, Double> mappingFunc2 = + Function3, State, Double> mappingFunc2 = new Function3, State, Double>() { - @Override public Double call(String key, Optional one, State state) { // Use all State's methods here @@ -95,13 +93,13 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaMapWithStateDStream stateDstream2 = wordsDstream.mapWithState( - StateSpec.function(mappingFunc2) + StateSpec.function(mappingFunc2) .initialState(initialRDD) .numPartitions(10) .partitioner(new HashPartitioner(10)) .timeout(Durations.seconds(10))); - JavaPairDStream stateSnapshots2 = stateDstream2.stateSnapshots(); + stateDstream2.stateSnapshots(); } @Test @@ -126,33 +124,21 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements Collections.emptySet() ); + @SuppressWarnings("unchecked") List>> stateData = Arrays.asList( Collections.>emptySet(), - Sets.newHashSet(new Tuple2("a", 1)), - Sets.newHashSet(new Tuple2("a", 2), new Tuple2("b", 1)), - Sets.newHashSet( - new Tuple2("a", 3), - new Tuple2("b", 2), - new Tuple2("c", 1)), - Sets.newHashSet( - new Tuple2("a", 4), - new Tuple2("b", 3), - new Tuple2("c", 1)), - Sets.newHashSet( - new Tuple2("a", 5), - new Tuple2("b", 3), - new Tuple2("c", 1)), - Sets.newHashSet( - new Tuple2("a", 5), - new Tuple2("b", 3), - new Tuple2("c", 1)) + Sets.newHashSet(new Tuple2<>("a", 1)), + Sets.newHashSet(new Tuple2<>("a", 2), new Tuple2<>("b", 1)), + Sets.newHashSet(new Tuple2<>("a", 3), new Tuple2<>("b", 2), new Tuple2<>("c", 1)), + Sets.newHashSet(new Tuple2<>("a", 4), new Tuple2<>("b", 3), new Tuple2<>("c", 1)), + Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new Tuple2<>("c", 1)), + Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new Tuple2<>("c", 1)) ); Function3, State, Integer> mappingFunc = new Function3, State, Integer>() { - @Override - public Integer call(String key, Optional value, State state) throws Exception { + public Integer call(String key, Optional value, State state) { int sum = value.or(0) + (state.exists() ? state.get() : 0); state.update(sum); return sum; @@ -160,7 +146,7 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements }; testOperation( inputData, - StateSpec.function(mappingFunc), + StateSpec.function(mappingFunc), outputData, stateData); } @@ -175,27 +161,25 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaMapWithStateDStream mapWithStateDStream = JavaPairDStream.fromJavaDStream(inputStream.map(new Function>() { @Override - public Tuple2 call(K x) throws Exception { - return new Tuple2(x, 1); + public Tuple2 call(K x) { + return new Tuple2<>(x, 1); } })).mapWithState(mapWithStateSpec); final List> collectedOutputs = - Collections.synchronizedList(Lists.>newArrayList()); - mapWithStateDStream.foreachRDD(new Function, Void>() { + Collections.synchronizedList(new ArrayList>()); + mapWithStateDStream.foreachRDD(new VoidFunction>() { @Override - public Void call(JavaRDD rdd) throws Exception { + public void call(JavaRDD rdd) { collectedOutputs.add(Sets.newHashSet(rdd.collect())); - return null; } }); final List>> collectedStateSnapshots = - Collections.synchronizedList(Lists.>>newArrayList()); - mapWithStateDStream.stateSnapshots().foreachRDD(new Function, Void>() { + Collections.synchronizedList(new ArrayList>>()); + mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction>() { @Override - public Void call(JavaPairRDD rdd) throws Exception { + public void call(JavaPairRDD rdd) { collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())); - return null; } }); BatchCounter batchCounter = new BatchCounter(ssc.ssc()); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 7a8ef9d147..d09258e0e4 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -18,13 +18,14 @@ package org.apache.spark.streaming; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import static org.junit.Assert.*; import com.google.common.io.Closeables; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -68,12 +69,11 @@ public class JavaReceiverAPISuite implements Serializable { return v1 + "."; } }); - mapped.foreachRDD(new Function, Void>() { + mapped.foreachRDD(new VoidFunction>() { @Override - public Void call(JavaRDD rdd) { + public void call(JavaRDD rdd) { long count = rdd.count(); dataCounter.addAndGet(count); - return null; } }); @@ -90,7 +90,7 @@ public class JavaReceiverAPISuite implements Serializable { Thread.sleep(100); } ssc.stop(); - assertTrue(dataCounter.get() > 0); + Assert.assertTrue(dataCounter.get() > 0); } finally { server.stop(); } @@ -98,8 +98,8 @@ public class JavaReceiverAPISuite implements Serializable { private static class JavaSocketReceiver extends Receiver { - String host = null; - int port = -1; + private String host = null; + private int port = -1; JavaSocketReceiver(String host_ , int port_) { super(StorageLevel.MEMORY_AND_DISK()); -- cgit v1.2.3