From de14d35f77071932963a994fac5aec0e5df838a1 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 19 Feb 2017 09:37:56 -0800 Subject: [SPARK-19533][EXAMPLES] Convert Java tests to use lambdas, Java 8 features ## What changes were proposed in this pull request? Convert Java tests to use lambdas, Java 8 features. ## How was this patch tested? Jenkins tests. Author: Sean Owen Closes #16961 from srowen/SPARK-19533. --- .../org/apache/spark/examples/JavaLogQuery.java | 21 +--- .../org/apache/spark/examples/JavaPageRank.java | 49 +++----- .../org/apache/spark/examples/JavaSparkPi.java | 20 +-- .../spark/examples/JavaStatusTrackerDemo.java | 5 +- .../java/org/apache/spark/examples/JavaTC.java | 8 +- .../org/apache/spark/examples/JavaWordCount.java | 27 +---- .../apache/spark/examples/ml/JavaALSExample.java | 7 +- ...delSelectionViaTrainValidationSplitExample.java | 3 - .../spark/examples/ml/JavaTokenizerExample.java | 13 +- .../spark/examples/ml/JavaVectorSlicerExample.java | 7 +- .../mllib/JavaAssociationRulesExample.java | 6 +- .../JavaBinaryClassificationMetricsExample.java | 33 ++--- .../examples/mllib/JavaBisectingKMeansExample.java | 7 +- .../examples/mllib/JavaChiSqSelectorExample.java | 38 ++---- .../JavaDecisionTreeClassificationExample.java | 26 ++-- .../mllib/JavaDecisionTreeRegressionExample.java | 33 ++--- .../mllib/JavaElementwiseProductExample.java | 27 +---- .../examples/mllib/JavaGaussianMixtureExample.java | 19 ++- .../JavaGradientBoostingClassificationExample.java | 21 +--- .../JavaGradientBoostingRegressionExample.java | 30 +---- .../mllib/JavaIsotonicRegressionExample.java | 39 ++---- .../spark/examples/mllib/JavaKMeansExample.java | 19 ++- .../spark/examples/mllib/JavaLBFGSExample.java | 23 ++-- .../JavaLatentDirichletAllocationExample.java | 28 ++--- .../mllib/JavaLinearRegressionWithSGDExample.java | 47 +++---- .../JavaLogisticRegressionWithLBFGSExample.java | 14 +-- ...JavaMulticlassClassificationMetricsExample.java | 13 +- .../examples/mllib/JavaNaiveBayesExample.java | 19 +-- .../mllib/JavaPowerIterationClusteringExample.java | 6 +- .../JavaRandomForestClassificationExample.java | 23 +--- .../mllib/JavaRandomForestRegressionExample.java | 37 ++---- .../examples/mllib/JavaRankingMetricsExample.java | 135 +++++++-------------- .../examples/mllib/JavaRecommendationExample.java | 58 +++------ .../mllib/JavaRegressionMetricsExample.java | 31 ++--- .../examples/mllib/JavaSVMWithSGDExample.java | 13 +- .../spark/examples/mllib/JavaSimpleFPGrowth.java | 12 +- .../examples/mllib/JavaStreamingTestExample.java | 40 ++---- .../examples/sql/JavaSQLDataSourceExample.java | 8 +- .../spark/examples/sql/JavaSparkSQLExample.java | 60 ++++----- .../examples/sql/hive/JavaSparkHiveExample.java | 9 +- .../streaming/JavaStructuredKafkaWordCount.java | 10 +- .../streaming/JavaStructuredNetworkWordCount.java | 11 +- .../JavaStructuredNetworkWordCountWindowed.java | 16 +-- .../examples/streaming/JavaCustomReceiver.java | 34 ++---- .../streaming/JavaDirectKafkaWordCount.java | 31 +---- .../examples/streaming/JavaFlumeEventCount.java | 8 +- .../examples/streaming/JavaKafkaWordCount.java | 33 +---- .../examples/streaming/JavaNetworkWordCount.java | 25 +--- .../spark/examples/streaming/JavaQueueStream.java | 24 +--- .../streaming/JavaRecoverableNetworkWordCount.java | 91 +++++--------- .../streaming/JavaSqlNetworkWordCount.java | 51 +++----- .../streaming/JavaStatefulNetworkWordCount.java | 30 ++--- 52 files changed, 380 insertions(+), 1018 deletions(-) (limited to 'examples/src') diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java index 7775443861..cf12de390f 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java @@ -17,18 +17,16 @@ package org.apache.spark.examples; -import com.google.common.collect.Lists; import scala.Tuple2; import scala.Tuple3; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -40,7 +38,7 @@ import java.util.regex.Pattern; */ public final class JavaLogQuery { - public static final List exampleApacheLogs = Lists.newArrayList( + public static final List exampleApacheLogs = Arrays.asList( "10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " + "HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " + "Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " + @@ -109,19 +107,10 @@ public final class JavaLogQuery { JavaRDD dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs); - JavaPairRDD, Stats> extracted = dataSet.mapToPair(new PairFunction, Stats>() { - @Override - public Tuple2, Stats> call(String s) { - return new Tuple2<>(extractKey(s), extractStats(s)); - } - }); + JavaPairRDD, Stats> extracted = + dataSet.mapToPair(s -> new Tuple2<>(extractKey(s), extractStats(s))); - JavaPairRDD, Stats> counts = extracted.reduceByKey(new Function2() { - @Override - public Stats call(Stats stats, Stats stats2) { - return stats.merge(stats2); - } - }); + JavaPairRDD, Stats> counts = extracted.reduceByKey(Stats::merge); List, Stats>> output = counts.collect(); for (Tuple2 t : output) { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index bcc493bdcb..b5b4703932 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -19,7 +19,6 @@ package org.apache.spark.examples; import java.util.ArrayList; import java.util.List; -import java.util.Iterator; import java.util.regex.Pattern; import scala.Tuple2; @@ -28,10 +27,7 @@ import com.google.common.collect.Iterables; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; /** @@ -90,52 +86,35 @@ public final class JavaPageRank { JavaRDD lines = spark.read().textFile(args[0]).javaRDD(); // Loads all URLs from input file and initialize their neighbors. - JavaPairRDD> links = lines.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - String[] parts = SPACES.split(s); - return new Tuple2<>(parts[0], parts[1]); - } - }).distinct().groupByKey().cache(); + JavaPairRDD> links = lines.mapToPair(s -> { + String[] parts = SPACES.split(s); + return new Tuple2<>(parts[0], parts[1]); + }).distinct().groupByKey().cache(); // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. - JavaPairRDD ranks = links.mapValues(new Function, Double>() { - @Override - public Double call(Iterable rs) { - return 1.0; - } - }); + JavaPairRDD ranks = links.mapValues(rs -> 1.0); // Calculates and updates URL ranks continuously using PageRank algorithm. for (int current = 0; current < Integer.parseInt(args[1]); current++) { // Calculates URL contributions to the rank of other URLs. JavaPairRDD contribs = links.join(ranks).values() - .flatMapToPair(new PairFlatMapFunction, Double>, String, Double>() { - @Override - public Iterator> call(Tuple2, Double> s) { - int urlCount = Iterables.size(s._1); - List> results = new ArrayList<>(); - for (String n : s._1) { - results.add(new Tuple2<>(n, s._2() / urlCount)); - } - return results.iterator(); + .flatMapToPair(s -> { + int urlCount = Iterables.size(s._1()); + List> results = new ArrayList<>(); + for (String n : s._1) { + results.add(new Tuple2<>(n, s._2() / urlCount)); } - }); + return results.iterator(); + }); // Re-calculates URL ranks based on neighbor contributions. - ranks = contribs.reduceByKey(new Sum()).mapValues(new Function() { - @Override - public Double call(Double sum) { - return 0.15 + sum * 0.85; - } - }); + ranks = contribs.reduceByKey(new Sum()).mapValues(sum -> 0.15 + sum * 0.85); } // Collects all URL ranks and dump them to console. List> output = ranks.collect(); for (Tuple2 tuple : output) { - System.out.println(tuple._1() + " has rank: " + tuple._2() + "."); + System.out.println(tuple._1() + " has rank: " + tuple._2() + "."); } spark.stop(); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index 89855e81f1..cb4b265690 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -19,8 +19,6 @@ package org.apache.spark.examples; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.sql.SparkSession; import java.util.ArrayList; @@ -49,19 +47,11 @@ public final class JavaSparkPi { JavaRDD dataSet = jsc.parallelize(l, slices); - int count = dataSet.map(new Function() { - @Override - public Integer call(Integer integer) { - double x = Math.random() * 2 - 1; - double y = Math.random() * 2 - 1; - return (x * x + y * y <= 1) ? 1 : 0; - } - }).reduce(new Function2() { - @Override - public Integer call(Integer integer, Integer integer2) { - return integer + integer2; - } - }); + int count = dataSet.map(integer -> { + double x = Math.random() * 2 - 1; + double y = Math.random() * 2 - 1; + return (x * x + y * y <= 1) ? 1 : 0; + }).reduce((integer, integer2) -> integer + integer2); System.out.println("Pi is roughly " + 4.0 * count / n); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java index 6f899c772e..b0ebedfed6 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java @@ -25,7 +25,6 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.SparkSession; - import java.util.Arrays; import java.util.List; @@ -50,11 +49,11 @@ public final class JavaStatusTrackerDemo { .appName(APP_NAME) .getOrCreate(); - final JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); + JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); // Example of implementing a progress reporter for a simple job. JavaRDD rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( - new IdentityWithDelay()); + new IdentityWithDelay<>()); JavaFutureAction> jobFuture = rdd.collectAsync(); while (!jobFuture.isDone()) { Thread.sleep(1000); // 1 second diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java index f12ca77ed1..bde30b84d6 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java @@ -80,13 +80,7 @@ public final class JavaTC { // the graph to obtain the path (x, z). // Because join() joins on keys, the edges are stored in reversed order. - JavaPairRDD edges = tc.mapToPair( - new PairFunction, Integer, Integer>() { - @Override - public Tuple2 call(Tuple2 e) { - return new Tuple2<>(e._2(), e._1()); - } - }); + JavaPairRDD edges = tc.mapToPair(e -> new Tuple2<>(e._2(), e._1())); long oldCount; long nextCount = tc.count(); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index 8f18604c07..f1ce1e9585 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -21,13 +21,9 @@ import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -48,28 +44,11 @@ public final class JavaWordCount { JavaRDD lines = spark.read().textFile(args[0]).javaRDD(); - JavaRDD words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String s) { - return Arrays.asList(SPACE.split(s)).iterator(); - } - }); + JavaRDD words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator()); - JavaPairRDD ones = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }); + JavaPairRDD ones = words.mapToPair(s -> new Tuple2<>(s, 1)); - JavaPairRDD counts = ones.reduceByKey( - new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaPairRDD counts = ones.reduceByKey((i1, i2) -> i1 + i2); List> output = counts.collect(); for (Tuple2 tuple : output) { diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java index 739558e81f..33ba668b32 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java @@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession; import java.io.Serializable; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.ml.evaluation.RegressionEvaluator; import org.apache.spark.ml.recommendation.ALS; import org.apache.spark.ml.recommendation.ALSModel; @@ -88,11 +87,7 @@ public class JavaALSExample { // $example on$ JavaRDD ratingsRDD = spark .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD() - .map(new Function() { - public Rating call(String str) { - return Rating.parseRating(str); - } - }); + .map(Rating::parseRating); Dataset ratings = spark.createDataFrame(ratingsRDD, Rating.class); Dataset[] splits = ratings.randomSplit(new double[]{0.8, 0.2}); Dataset training = splits[0]; diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java index 0f96293f03..9a4722b90c 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java @@ -32,9 +32,6 @@ import org.apache.spark.sql.SparkSession; /** * Java example demonstrating model selection using TrainValidationSplit. * - * The example is based on {@link org.apache.spark.examples.ml.JavaSimpleParamsExample} - * using linear regression. - * * Run with * {{{ * bin/run-example ml.JavaModelSelectionViaTrainValidationSplitExample diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java index 004e9b12f6..3f809eba7f 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java @@ -69,20 +69,17 @@ public class JavaTokenizerExample { .setOutputCol("words") .setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false); - spark.udf().register("countTokens", new UDF1, Integer>() { - @Override - public Integer call(WrappedArray words) { - return words.size(); - } - }, DataTypes.IntegerType); + spark.udf().register("countTokens", (WrappedArray words) -> words.size(), DataTypes.IntegerType); Dataset tokenized = tokenizer.transform(sentenceDataFrame); tokenized.select("sentence", "words") - .withColumn("tokens", callUDF("countTokens", col("words"))).show(false); + .withColumn("tokens", callUDF("countTokens", col("words"))) + .show(false); Dataset regexTokenized = regexTokenizer.transform(sentenceDataFrame); regexTokenized.select("sentence", "words") - .withColumn("tokens", callUDF("countTokens", col("words"))).show(false); + .withColumn("tokens", callUDF("countTokens", col("words"))) + .show(false); // $example off$ spark.stop(); diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java index 1922514c87..1ae48be266 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java @@ -20,10 +20,9 @@ package org.apache.spark.examples.ml; import org.apache.spark.sql.SparkSession; // $example on$ +import java.util.Arrays; import java.util.List; -import com.google.common.collect.Lists; - import org.apache.spark.ml.attribute.Attribute; import org.apache.spark.ml.attribute.AttributeGroup; import org.apache.spark.ml.attribute.NumericAttribute; @@ -43,14 +42,14 @@ public class JavaVectorSlicerExample { .getOrCreate(); // $example on$ - Attribute[] attrs = new Attribute[]{ + Attribute[] attrs = { NumericAttribute.defaultAttr().withName("f1"), NumericAttribute.defaultAttr().withName("f2"), NumericAttribute.defaultAttr().withName("f3") }; AttributeGroup group = new AttributeGroup("userFeatures", attrs); - List data = Lists.newArrayList( + List data = Arrays.asList( RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})), RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0)) ); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java index 189560e3fe..5f43603f4f 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java @@ -38,9 +38,9 @@ public class JavaAssociationRulesExample { // $example on$ JavaRDD> freqItemsets = sc.parallelize(Arrays.asList( - new FreqItemset(new String[] {"a"}, 15L), - new FreqItemset(new String[] {"b"}, 35L), - new FreqItemset(new String[] {"a", "b"}, 12L) + new FreqItemset<>(new String[] {"a"}, 15L), + new FreqItemset<>(new String[] {"b"}, 35L), + new FreqItemset<>(new String[] {"a", "b"}, 12L) )); AssociationRules arules = new AssociationRules() diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java index 12aa14f710..b9d0313c6b 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java @@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; @@ -46,7 +45,7 @@ public class JavaBinaryClassificationMetricsExample { JavaRDD test = splits[1]; // Run training algorithm to build the model. - final LogisticRegressionModel model = new LogisticRegressionWithLBFGS() + LogisticRegressionModel model = new LogisticRegressionWithLBFGS() .setNumClasses(2) .run(training.rdd()); @@ -54,15 +53,8 @@ public class JavaBinaryClassificationMetricsExample { model.clearThreshold(); // Compute raw scores on the test set. - JavaRDD> predictionAndLabels = test.map( - new Function>() { - @Override - public Tuple2 call(LabeledPoint p) { - Double prediction = model.predict(p.features()); - return new Tuple2(prediction, p.label()); - } - } - ); + JavaPairRDD predictionAndLabels = test.mapToPair(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. BinaryClassificationMetrics metrics = @@ -73,32 +65,25 @@ public class JavaBinaryClassificationMetricsExample { System.out.println("Precision by threshold: " + precision.collect()); // Recall by threshold - JavaRDD> recall = metrics.recallByThreshold().toJavaRDD(); + JavaRDD recall = metrics.recallByThreshold().toJavaRDD(); System.out.println("Recall by threshold: " + recall.collect()); // F Score by threshold - JavaRDD> f1Score = metrics.fMeasureByThreshold().toJavaRDD(); + JavaRDD f1Score = metrics.fMeasureByThreshold().toJavaRDD(); System.out.println("F1 Score by threshold: " + f1Score.collect()); - JavaRDD> f2Score = metrics.fMeasureByThreshold(2.0).toJavaRDD(); + JavaRDD f2Score = metrics.fMeasureByThreshold(2.0).toJavaRDD(); System.out.println("F2 Score by threshold: " + f2Score.collect()); // Precision-recall curve - JavaRDD> prc = metrics.pr().toJavaRDD(); + JavaRDD prc = metrics.pr().toJavaRDD(); System.out.println("Precision-recall curve: " + prc.collect()); // Thresholds - JavaRDD thresholds = precision.map( - new Function, Double>() { - @Override - public Double call(Tuple2 t) { - return new Double(t._1().toString()); - } - } - ); + JavaRDD thresholds = precision.map(t -> Double.parseDouble(t._1().toString())); // ROC Curve - JavaRDD> roc = metrics.roc().toJavaRDD(); + JavaRDD roc = metrics.roc().toJavaRDD(); System.out.println("ROC curve: " + roc.collect()); // AUPRC diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java index c600094947..f878b55a98 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java @@ -17,10 +17,9 @@ package org.apache.spark.examples.mllib; -import java.util.ArrayList; - // $example on$ -import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.List; // $example off$ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -41,7 +40,7 @@ public class JavaBisectingKMeansExample { JavaSparkContext sc = new JavaSparkContext(sparkConf); // $example on$ - ArrayList localData = Lists.newArrayList( + List localData = Arrays.asList( Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3), Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3), Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3), diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java index ad44acb4cd..ce354af2b5 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java @@ -19,10 +19,8 @@ package org.apache.spark.examples.mllib; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.VoidFunction; // $example on$ import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.feature.ChiSqSelector; import org.apache.spark.mllib.feature.ChiSqSelectorModel; import org.apache.spark.mllib.linalg.Vectors; @@ -42,41 +40,25 @@ public class JavaChiSqSelectorExample { // Discretize data in 16 equal bins since ChiSqSelector requires categorical features // Although features are doubles, the ChiSqSelector treats each unique value as a category - JavaRDD discretizedData = points.map( - new Function() { - @Override - public LabeledPoint call(LabeledPoint lp) { - final double[] discretizedFeatures = new double[lp.features().size()]; - for (int i = 0; i < lp.features().size(); ++i) { - discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16); - } - return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures)); - } + JavaRDD discretizedData = points.map(lp -> { + double[] discretizedFeatures = new double[lp.features().size()]; + for (int i = 0; i < lp.features().size(); ++i) { + discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16); } - ); + return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures)); + }); // Create ChiSqSelector that will select top 50 of 692 features ChiSqSelector selector = new ChiSqSelector(50); // Create ChiSqSelector model (selecting features) - final ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd()); + ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd()); // Filter the top 50 features from each feature vector - JavaRDD filteredData = discretizedData.map( - new Function() { - @Override - public LabeledPoint call(LabeledPoint lp) { - return new LabeledPoint(lp.label(), transformer.transform(lp.features())); - } - } - ); + JavaRDD filteredData = discretizedData.map(lp -> + new LabeledPoint(lp.label(), transformer.transform(lp.features()))); // $example off$ System.out.println("filtered data: "); - filteredData.foreach(new VoidFunction() { - @Override - public void call(LabeledPoint labeledPoint) throws Exception { - System.out.println(labeledPoint.toString()); - } - }); + filteredData.foreach(System.out::println); jsc.stop(); } diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java index 66387b9df5..032c168b94 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java @@ -27,8 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.DecisionTree; import org.apache.spark.mllib.tree.model.DecisionTreeModel; @@ -53,31 +51,21 @@ class JavaDecisionTreeClassificationExample { // Set parameters. // Empty categoricalFeaturesInfo indicates all features are continuous. - Integer numClasses = 2; + int numClasses = 2; Map categoricalFeaturesInfo = new HashMap<>(); String impurity = "gini"; - Integer maxDepth = 5; - Integer maxBins = 32; + int maxDepth = 5; + int maxBins = 32; // Train a DecisionTree model for classification. - final DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses, + DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins); // Evaluate model on test instances and compute test error JavaPairRDD predictionAndLabel = - testData.mapToPair(new PairFunction() { - @Override - public Tuple2 call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testErr = - 1.0 * predictionAndLabel.filter(new Function, Boolean>() { - @Override - public Boolean call(Tuple2 pl) { - return !pl._1().equals(pl._2()); - } - }).count() / testData.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testErr = + predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count(); System.out.println("Test Error: " + testErr); System.out.println("Learned classification tree model:\n" + model.toDebugString()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java index 904e7f7e95..f222c38fc8 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java @@ -27,9 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.DecisionTree; import org.apache.spark.mllib.tree.model.DecisionTreeModel; @@ -56,34 +53,20 @@ class JavaDecisionTreeRegressionExample { // Empty categoricalFeaturesInfo indicates all features are continuous. Map categoricalFeaturesInfo = new HashMap<>(); String impurity = "variance"; - Integer maxDepth = 5; - Integer maxBins = 32; + int maxDepth = 5; + int maxBins = 32; // Train a DecisionTree model. - final DecisionTreeModel model = DecisionTree.trainRegressor(trainingData, + DecisionTreeModel model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity, maxDepth, maxBins); // Evaluate model on test instances and compute test error JavaPairRDD predictionAndLabel = - testData.mapToPair(new PairFunction() { - @Override - public Tuple2 call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testMSE = - predictionAndLabel.map(new Function, Double>() { - @Override - public Double call(Tuple2 pl) { - Double diff = pl._1() - pl._2(); - return diff * diff; - } - }).reduce(new Function2() { - @Override - public Double call(Double a, Double b) { - return a + b; - } - }) / data.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testMSE = predictionAndLabel.mapToDouble(pl -> { + double diff = pl._1() - pl._2(); + return diff * diff; + }).mean(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression tree model:\n" + model.toDebugString()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java index c8ce6ab284..2d45c6166f 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java @@ -25,12 +25,10 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; // $example on$ import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.feature.ElementwiseProduct; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; // $example off$ -import org.apache.spark.api.java.function.VoidFunction; public class JavaElementwiseProductExample { public static void main(String[] args) { @@ -43,35 +41,18 @@ public class JavaElementwiseProductExample { JavaRDD data = jsc.parallelize(Arrays.asList( Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0))); Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0); - final ElementwiseProduct transformer = new ElementwiseProduct(transformingVector); + ElementwiseProduct transformer = new ElementwiseProduct(transformingVector); // Batch transform and per-row transform give the same results: JavaRDD transformedData = transformer.transform(data); - JavaRDD transformedData2 = data.map( - new Function() { - @Override - public Vector call(Vector v) { - return transformer.transform(v); - } - } - ); + JavaRDD transformedData2 = data.map(transformer::transform); // $example off$ System.out.println("transformedData: "); - transformedData.foreach(new VoidFunction() { - @Override - public void call(Vector vector) throws Exception { - System.out.println(vector.toString()); - } - }); + transformedData.foreach(System.out::println); System.out.println("transformedData2: "); - transformedData2.foreach(new VoidFunction() { - @Override - public void call(Vector vector) throws Exception { - System.out.println(vector.toString()); - } - }); + transformedData2.foreach(System.out::println); jsc.stop(); } diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java index 3124411c82..5792e5a71c 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java @@ -22,7 +22,6 @@ import org.apache.spark.api.java.JavaSparkContext; // $example on$ import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.GaussianMixture; import org.apache.spark.mllib.clustering.GaussianMixtureModel; import org.apache.spark.mllib.linalg.Vector; @@ -39,18 +38,14 @@ public class JavaGaussianMixtureExample { // Load and parse data String path = "data/mllib/gmm_data.txt"; JavaRDD data = jsc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public Vector call(String s) { - String[] sarray = s.trim().split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) { - values[i] = Double.parseDouble(sarray[i]); - } - return Vectors.dense(values); - } + JavaRDD parsedData = data.map(s -> { + String[] sarray = s.trim().split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) { + values[i] = Double.parseDouble(sarray[i]); } - ); + return Vectors.dense(values); + }); parsedData.cache(); // Cluster the data into two classes using GaussianMixture diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java index 213949e525..521ee96fbd 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java @@ -27,8 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.GradientBoostedTrees; import org.apache.spark.mllib.tree.configuration.BoostingStrategy; @@ -61,24 +59,13 @@ public class JavaGradientBoostingClassificationExample { Map categoricalFeaturesInfo = new HashMap<>(); boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo); - final GradientBoostedTreesModel model = - GradientBoostedTrees.train(trainingData, boostingStrategy); + GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy); // Evaluate model on test instances and compute test error JavaPairRDD predictionAndLabel = - testData.mapToPair(new PairFunction() { - @Override - public Tuple2 call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testErr = - 1.0 * predictionAndLabel.filter(new Function, Boolean>() { - @Override - public Boolean call(Tuple2 pl) { - return !pl._1().equals(pl._2()); - } - }).count() / testData.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testErr = + predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count(); System.out.println("Test Error: " + testErr); System.out.println("Learned classification GBT model:\n" + model.toDebugString()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java index 78db442dbc..b345d19f59 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java @@ -24,12 +24,9 @@ import java.util.Map; import scala.Tuple2; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.GradientBoostedTrees; import org.apache.spark.mllib.tree.configuration.BoostingStrategy; @@ -60,30 +57,15 @@ public class JavaGradientBoostingRegressionExample { Map categoricalFeaturesInfo = new HashMap<>(); boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo); - final GradientBoostedTreesModel model = - GradientBoostedTrees.train(trainingData, boostingStrategy); + GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy); // Evaluate model on test instances and compute test error JavaPairRDD predictionAndLabel = - testData.mapToPair(new PairFunction() { - @Override - public Tuple2 call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testMSE = - predictionAndLabel.map(new Function, Double>() { - @Override - public Double call(Tuple2 pl) { - Double diff = pl._1() - pl._2(); - return diff * diff; - } - }).reduce(new Function2() { - @Override - public Double call(Double a, Double b) { - return a + b; - } - }) / data.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testMSE = predictionAndLabel.mapToDouble(pl -> { + double diff = pl._1() - pl._2(); + return diff * diff; + }).mean(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression GBT model:\n" + model.toDebugString()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java index a30b5f1f73..adebafe4b8 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java @@ -20,9 +20,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import scala.Tuple3; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; @@ -42,14 +39,8 @@ public class JavaIsotonicRegressionExample { jsc.sc(), "data/mllib/sample_isotonic_regression_libsvm_data.txt").toJavaRDD(); // Create label, feature, weight tuples from input data with weight set to default value 1.0. - JavaRDD> parsedData = data.map( - new Function>() { - public Tuple3 call(LabeledPoint point) { - return new Tuple3<>(new Double(point.label()), - new Double(point.features().apply(0)), 1.0); - } - } - ); + JavaRDD> parsedData = data.map(point -> + new Tuple3<>(point.label(), point.features().apply(0), 1.0)); // Split data into training (60%) and test (40%) sets. JavaRDD>[] splits = @@ -59,29 +50,17 @@ public class JavaIsotonicRegressionExample { // Create isotonic regression model from training data. // Isotonic parameter defaults to true so it is only shown for demonstration - final IsotonicRegressionModel model = - new IsotonicRegression().setIsotonic(true).run(training); + IsotonicRegressionModel model = new IsotonicRegression().setIsotonic(true).run(training); // Create tuples of predicted and real labels. - JavaPairRDD predictionAndLabel = test.mapToPair( - new PairFunction, Double, Double>() { - @Override - public Tuple2 call(Tuple3 point) { - Double predictedLabel = model.predict(point._2()); - return new Tuple2<>(predictedLabel, point._1()); - } - } - ); + JavaPairRDD predictionAndLabel = test.mapToPair(point -> + new Tuple2<>(model.predict(point._2()), point._1())); // Calculate mean squared error between predicted and real labels. - Double meanSquaredError = new JavaDoubleRDD(predictionAndLabel.map( - new Function, Object>() { - @Override - public Object call(Tuple2 pl) { - return Math.pow(pl._1() - pl._2(), 2); - } - } - ).rdd()).mean(); + double meanSquaredError = predictionAndLabel.mapToDouble(pl -> { + double diff = pl._1() - pl._2(); + return diff * diff; + }).mean(); System.out.println("Mean Squared Error = " + meanSquaredError); // Save and load model diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java index 2d89c768fc..f17275617a 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java @@ -22,7 +22,6 @@ import org.apache.spark.api.java.JavaSparkContext; // $example on$ import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.KMeans; import org.apache.spark.mllib.clustering.KMeansModel; import org.apache.spark.mllib.linalg.Vector; @@ -39,18 +38,14 @@ public class JavaKMeansExample { // Load and parse data String path = "data/mllib/kmeans_data.txt"; JavaRDD data = jsc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public Vector call(String s) { - String[] sarray = s.split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) { - values[i] = Double.parseDouble(sarray[i]); - } - return Vectors.dense(values); - } + JavaRDD parsedData = data.map(s -> { + String[] sarray = s.split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) { + values[i] = Double.parseDouble(sarray[i]); } - ); + return Vectors.dense(values); + }); parsedData.cache(); // Cluster the data into two classes using KMeans diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java index f6f91f486f..3fdc03a92a 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java @@ -23,7 +23,6 @@ import java.util.Arrays; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; import org.apache.spark.mllib.linalg.Vector; @@ -50,12 +49,8 @@ public class JavaLBFGSExample { JavaRDD test = data.subtract(trainingInit); // Append 1 into the training data as intercept. - JavaRDD> training = data.map( - new Function>() { - public Tuple2 call(LabeledPoint p) { - return new Tuple2(p.label(), MLUtils.appendBias(p.features())); - } - }); + JavaPairRDD training = data.mapToPair(p -> + new Tuple2<>(p.label(), MLUtils.appendBias(p.features()))); training.cache(); // Run training algorithm to build the model. @@ -77,7 +72,7 @@ public class JavaLBFGSExample { Vector weightsWithIntercept = result._1(); double[] loss = result._2(); - final LogisticRegressionModel model = new LogisticRegressionModel( + LogisticRegressionModel model = new LogisticRegressionModel( Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)), (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]); @@ -85,13 +80,8 @@ public class JavaLBFGSExample { model.clearThreshold(); // Compute raw scores on the test set. - JavaRDD> scoreAndLabels = test.map( - new Function>() { - public Tuple2 call(LabeledPoint p) { - Double score = model.predict(p.features()); - return new Tuple2(score, p.label()); - } - }); + JavaPairRDD scoreAndLabels = test.mapToPair(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. BinaryClassificationMetrics metrics = @@ -99,8 +89,9 @@ public class JavaLBFGSExample { double auROC = metrics.areaUnderROC(); System.out.println("Loss of each step in training process"); - for (double l : loss) + for (double l : loss) { System.out.println(l); + } System.out.println("Area under ROC = " + auROC); // $example off$ diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java index 578564eeb2..887edf8c21 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java @@ -25,7 +25,6 @@ import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.DistributedLDAModel; import org.apache.spark.mllib.clustering.LDA; import org.apache.spark.mllib.clustering.LDAModel; @@ -44,28 +43,17 @@ public class JavaLatentDirichletAllocationExample { // Load and parse the data String path = "data/mllib/sample_lda_data.txt"; JavaRDD data = jsc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public Vector call(String s) { - String[] sarray = s.trim().split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) { - values[i] = Double.parseDouble(sarray[i]); - } - return Vectors.dense(values); - } + JavaRDD parsedData = data.map(s -> { + String[] sarray = s.trim().split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) { + values[i] = Double.parseDouble(sarray[i]); } - ); + return Vectors.dense(values); + }); // Index documents with unique IDs JavaPairRDD corpus = - JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map( - new Function, Tuple2>() { - public Tuple2 call(Tuple2 doc_id) { - return doc_id.swap(); - } - } - ) - ); + JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap)); corpus.cache(); // Cluster the documents into three topics using LDA diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java index 9ca9a7847c..324a781c1a 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java @@ -23,9 +23,8 @@ import org.apache.spark.api.java.JavaSparkContext; // $example on$ import scala.Tuple2; -import org.apache.spark.api.java.JavaDoubleRDD; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.regression.LinearRegressionModel; @@ -44,43 +43,31 @@ public class JavaLinearRegressionWithSGDExample { // Load and parse the data String path = "data/mllib/ridge-data/lpsa.data"; JavaRDD data = sc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public LabeledPoint call(String line) { - String[] parts = line.split(","); - String[] features = parts[1].split(" "); - double[] v = new double[features.length]; - for (int i = 0; i < features.length - 1; i++) { - v[i] = Double.parseDouble(features[i]); - } - return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); - } + JavaRDD parsedData = data.map(line -> { + String[] parts = line.split(","); + String[] features = parts[1].split(" "); + double[] v = new double[features.length]; + for (int i = 0; i < features.length - 1; i++) { + v[i] = Double.parseDouble(features[i]); } - ); + return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); + }); parsedData.cache(); // Building the model int numIterations = 100; double stepSize = 0.00000001; - final LinearRegressionModel model = + LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize); // Evaluate model on training examples and compute training error - JavaRDD> valuesAndPreds = parsedData.map( - new Function>() { - public Tuple2 call(LabeledPoint point) { - double prediction = model.predict(point.features()); - return new Tuple2<>(prediction, point.label()); - } - } - ); - double MSE = new JavaDoubleRDD(valuesAndPreds.map( - new Function, Object>() { - public Object call(Tuple2 pair) { - return Math.pow(pair._1() - pair._2(), 2.0); - } - } - ).rdd()).mean(); + JavaPairRDD valuesAndPreds = parsedData.mapToPair(point -> + new Tuple2<>(model.predict(point.features()), point.label())); + + double MSE = valuesAndPreds.mapToDouble(pair -> { + double diff = pair._1() - pair._2(); + return diff * diff; + }).mean(); System.out.println("training Mean Squared Error = " + MSE); // Save and load model diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java index 7fc371ec0f..26b8a6e9fa 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java @@ -23,8 +23,8 @@ import org.apache.spark.SparkContext; // $example on$ import scala.Tuple2; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; import org.apache.spark.mllib.evaluation.MulticlassMetrics; @@ -49,19 +49,13 @@ public class JavaLogisticRegressionWithLBFGSExample { JavaRDD test = splits[1]; // Run training algorithm to build the model. - final LogisticRegressionModel model = new LogisticRegressionWithLBFGS() + LogisticRegressionModel model = new LogisticRegressionWithLBFGS() .setNumClasses(10) .run(training.rdd()); // Compute raw scores on the test set. - JavaRDD> predictionAndLabels = test.map( - new Function>() { - public Tuple2 call(LabeledPoint p) { - Double prediction = model.predict(p.features()); - return new Tuple2(prediction, p.label()); - } - } - ); + JavaPairRDD predictionAndLabels = test.mapToPair(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java index 2d12bdd2a6..03670383b7 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java @@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; import org.apache.spark.mllib.evaluation.MulticlassMetrics; @@ -46,19 +45,13 @@ public class JavaMulticlassClassificationMetricsExample { JavaRDD test = splits[1]; // Run training algorithm to build the model. - final LogisticRegressionModel model = new LogisticRegressionWithLBFGS() + LogisticRegressionModel model = new LogisticRegressionWithLBFGS() .setNumClasses(3) .run(training.rdd()); // Compute raw scores on the test set. - JavaRDD> predictionAndLabels = test.map( - new Function>() { - public Tuple2 call(LabeledPoint p) { - Double prediction = model.predict(p.features()); - return new Tuple2(prediction, p.label()); - } - } - ); + JavaPairRDD predictionAndLabels = test.mapToPair(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java index f4ec04b0c6..d80dbe8000 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java @@ -19,8 +19,6 @@ package org.apache.spark.examples.mllib; // $example on$ import scala.Tuple2; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -41,20 +39,11 @@ public class JavaNaiveBayesExample { JavaRDD[] tmp = inputData.randomSplit(new double[]{0.6, 0.4}); JavaRDD training = tmp[0]; // training set JavaRDD test = tmp[1]; // test set - final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0); + NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0); JavaPairRDD predictionAndLabel = - test.mapToPair(new PairFunction() { - @Override - public Tuple2 call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - double accuracy = predictionAndLabel.filter(new Function, Boolean>() { - @Override - public Boolean call(Tuple2 pl) { - return pl._1().equals(pl._2()); - } - }).count() / (double) test.count(); + test.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double accuracy = + predictionAndLabel.filter(pl -> pl._1().equals(pl._2())).count() / (double) test.count(); // Save and load model model.save(jsc.sc(), "target/tmp/myNaiveBayesModel"); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java index 91c3bd72da..5155f182ba 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java @@ -17,9 +17,9 @@ package org.apache.spark.examples.mllib; -import scala.Tuple3; +import java.util.Arrays; -import com.google.common.collect.Lists; +import scala.Tuple3; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -39,7 +39,7 @@ public class JavaPowerIterationClusteringExample { @SuppressWarnings("unchecked") // $example on$ - JavaRDD> similarities = sc.parallelize(Lists.newArrayList( + JavaRDD> similarities = sc.parallelize(Arrays.asList( new Tuple3<>(0L, 1L, 0.9), new Tuple3<>(1L, 2L, 0.9), new Tuple3<>(2L, 3L, 0.9), diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java index 24af5d0180..6998ce2156 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java @@ -19,6 +19,7 @@ package org.apache.spark.examples.mllib; // $example on$ import java.util.HashMap; +import java.util.Map; import scala.Tuple2; @@ -26,8 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.RandomForest; import org.apache.spark.mllib.tree.model.RandomForestModel; @@ -50,7 +49,7 @@ public class JavaRandomForestClassificationExample { // Train a RandomForest model. // Empty categoricalFeaturesInfo indicates all features are continuous. Integer numClasses = 2; - HashMap categoricalFeaturesInfo = new HashMap<>(); + Map categoricalFeaturesInfo = new HashMap<>(); Integer numTrees = 3; // Use more in practice. String featureSubsetStrategy = "auto"; // Let the algorithm choose. String impurity = "gini"; @@ -58,25 +57,15 @@ public class JavaRandomForestClassificationExample { Integer maxBins = 32; Integer seed = 12345; - final RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses, + RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed); // Evaluate model on test instances and compute test error JavaPairRDD predictionAndLabel = - testData.mapToPair(new PairFunction() { - @Override - public Tuple2 call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testErr = - 1.0 * predictionAndLabel.filter(new Function, Boolean>() { - @Override - public Boolean call(Tuple2 pl) { - return !pl._1().equals(pl._2()); - } - }).count() / testData.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testErr = + predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count(); System.out.println("Test Error: " + testErr); System.out.println("Learned classification forest model:\n" + model.toDebugString()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java index afa9045878..4a0f55f529 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java @@ -23,12 +23,9 @@ import java.util.Map; import scala.Tuple2; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.RandomForest; import org.apache.spark.mllib.tree.model.RandomForestModel; @@ -52,37 +49,23 @@ public class JavaRandomForestRegressionExample { // Set parameters. // Empty categoricalFeaturesInfo indicates all features are continuous. Map categoricalFeaturesInfo = new HashMap<>(); - Integer numTrees = 3; // Use more in practice. + int numTrees = 3; // Use more in practice. String featureSubsetStrategy = "auto"; // Let the algorithm choose. String impurity = "variance"; - Integer maxDepth = 4; - Integer maxBins = 32; - Integer seed = 12345; + int maxDepth = 4; + int maxBins = 32; + int seed = 12345; // Train a RandomForest model. - final RandomForestModel model = RandomForest.trainRegressor(trainingData, + RandomForestModel model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed); // Evaluate model on test instances and compute test error JavaPairRDD predictionAndLabel = - testData.mapToPair(new PairFunction() { - @Override - public Tuple2 call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testMSE = - predictionAndLabel.map(new Function, Double>() { - @Override - public Double call(Tuple2 pl) { - Double diff = pl._1() - pl._2(); - return diff * diff; - } - }).reduce(new Function2() { - @Override - public Double call(Double a, Double b) { - return a + b; - } - }) / testData.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testMSE = predictionAndLabel.mapToDouble(pl -> { + double diff = pl._1() - pl._2(); + return diff * diff; + }).mean(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression forest model:\n" + model.toDebugString()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java index 54dfc404ca..bd49f059b2 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java @@ -23,7 +23,6 @@ import java.util.*; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.evaluation.RegressionMetrics; import org.apache.spark.mllib.evaluation.RankingMetrics; import org.apache.spark.mllib.recommendation.ALS; @@ -39,93 +38,61 @@ public class JavaRankingMetricsExample { // $example on$ String path = "data/mllib/sample_movielens_data.txt"; JavaRDD data = sc.textFile(path); - JavaRDD ratings = data.map( - new Function() { - @Override - public Rating call(String line) { - String[] parts = line.split("::"); - return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double - .parseDouble(parts[2]) - 2.5); - } - } - ); + JavaRDD ratings = data.map(line -> { + String[] parts = line.split("::"); + return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double + .parseDouble(parts[2]) - 2.5); + }); ratings.cache(); // Train an ALS model - final MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 10, 10, 0.01); + MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 10, 10, 0.01); // Get top 10 recommendations for every user and scale ratings from 0 to 1 JavaRDD> userRecs = model.recommendProductsForUsers(10).toJavaRDD(); - JavaRDD> userRecsScaled = userRecs.map( - new Function, Tuple2>() { - @Override - public Tuple2 call(Tuple2 t) { - Rating[] scaledRatings = new Rating[t._2().length]; - for (int i = 0; i < scaledRatings.length; i++) { - double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0); - scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating); - } - return new Tuple2<>(t._1(), scaledRatings); + JavaRDD> userRecsScaled = userRecs.map(t -> { + Rating[] scaledRatings = new Rating[t._2().length]; + for (int i = 0; i < scaledRatings.length; i++) { + double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0); + scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating); } - } - ); + return new Tuple2<>(t._1(), scaledRatings); + }); JavaPairRDD userRecommended = JavaPairRDD.fromJavaRDD(userRecsScaled); // Map ratings to 1 or 0, 1 indicating a movie that should be recommended - JavaRDD binarizedRatings = ratings.map( - new Function() { - @Override - public Rating call(Rating r) { - double binaryRating; - if (r.rating() > 0.0) { - binaryRating = 1.0; - } else { - binaryRating = 0.0; - } - return new Rating(r.user(), r.product(), binaryRating); + JavaRDD binarizedRatings = ratings.map(r -> { + double binaryRating; + if (r.rating() > 0.0) { + binaryRating = 1.0; + } else { + binaryRating = 0.0; } - } - ); + return new Rating(r.user(), r.product(), binaryRating); + }); // Group ratings by common user - JavaPairRDD> userMovies = binarizedRatings.groupBy( - new Function() { - @Override - public Object call(Rating r) { - return r.user(); - } - } - ); + JavaPairRDD> userMovies = binarizedRatings.groupBy(Rating::user); // Get true relevant documents from all user ratings - JavaPairRDD> userMoviesList = userMovies.mapValues( - new Function, List>() { - @Override - public List call(Iterable docs) { - List products = new ArrayList<>(); - for (Rating r : docs) { - if (r.rating() > 0.0) { - products.add(r.product()); - } + JavaPairRDD> userMoviesList = userMovies.mapValues(docs -> { + List products = new ArrayList<>(); + for (Rating r : docs) { + if (r.rating() > 0.0) { + products.add(r.product()); } - return products; } - } - ); + return products; + }); // Extract the product id from each recommendation - JavaPairRDD> userRecommendedList = userRecommended.mapValues( - new Function>() { - @Override - public List call(Rating[] docs) { - List products = new ArrayList<>(); - for (Rating r : docs) { - products.add(r.product()); - } - return products; + JavaPairRDD> userRecommendedList = userRecommended.mapValues(docs -> { + List products = new ArrayList<>(); + for (Rating r : docs) { + products.add(r.product()); } - } - ); + return products; + }); JavaRDD, List>> relevantDocs = userMoviesList.join( userRecommendedList).values(); @@ -143,33 +110,15 @@ public class JavaRankingMetricsExample { System.out.format("Mean average precision = %f\n", metrics.meanAveragePrecision()); // Evaluate the model using numerical ratings and regression metrics - JavaRDD> userProducts = ratings.map( - new Function>() { - @Override - public Tuple2 call(Rating r) { - return new Tuple2(r.user(), r.product()); - } - } - ); + JavaRDD> userProducts = + ratings.map(r -> new Tuple2<>(r.user(), r.product())); + JavaPairRDD, Object> predictions = JavaPairRDD.fromJavaRDD( - model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( - new Function, Object>>() { - @Override - public Tuple2, Object> call(Rating r) { - return new Tuple2, Object>( - new Tuple2<>(r.user(), r.product()), r.rating()); - } - } - )); + model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(r -> + new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))); JavaRDD> ratesAndPreds = - JavaPairRDD.fromJavaRDD(ratings.map( - new Function, Object>>() { - @Override - public Tuple2, Object> call(Rating r) { - return new Tuple2, Object>( - new Tuple2<>(r.user(), r.product()), r.rating()); - } - } + JavaPairRDD.fromJavaRDD(ratings.map(r -> + new Tuple2, Object>(new Tuple2<>(r.user(), r.product()), r.rating()) )).join(predictions).values(); // Create regression metrics object diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java index f69aa4b75a..1ee68da35e 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java @@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.recommendation.ALS; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; @@ -37,15 +36,12 @@ public class JavaRecommendationExample { // Load and parse the data String path = "data/mllib/als/test.data"; JavaRDD data = jsc.textFile(path); - JavaRDD ratings = data.map( - new Function() { - public Rating call(String s) { - String[] sarray = s.split(","); - return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), - Double.parseDouble(sarray[2])); - } - } - ); + JavaRDD ratings = data.map(s -> { + String[] sarray = s.split(","); + return new Rating(Integer.parseInt(sarray[0]), + Integer.parseInt(sarray[1]), + Double.parseDouble(sarray[2])); + }); // Build the recommendation model using ALS int rank = 10; @@ -53,37 +49,19 @@ public class JavaRecommendationExample { MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); // Evaluate the model on rating data - JavaRDD> userProducts = ratings.map( - new Function>() { - public Tuple2 call(Rating r) { - return new Tuple2(r.user(), r.product()); - } - } - ); + JavaRDD> userProducts = + ratings.map(r -> new Tuple2<>(r.user(), r.product())); JavaPairRDD, Double> predictions = JavaPairRDD.fromJavaRDD( - model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( - new Function, Double>>() { - public Tuple2, Double> call(Rating r){ - return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()); - } - } - )); - JavaRDD> ratesAndPreds = - JavaPairRDD.fromJavaRDD(ratings.map( - new Function, Double>>() { - public Tuple2, Double> call(Rating r){ - return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()); - } - } - )).join(predictions).values(); - double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map( - new Function, Object>() { - public Object call(Tuple2 pair) { - Double err = pair._1() - pair._2(); - return err * err; - } - } - ).rdd()).mean(); + model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD() + .map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())) + ); + JavaRDD> ratesAndPreds = JavaPairRDD.fromJavaRDD( + ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))) + .join(predictions).values(); + double MSE = ratesAndPreds.mapToDouble(pair -> { + double err = pair._1() - pair._2(); + return err * err; + }).mean(); System.out.println("Mean Squared Error = " + MSE); // Save and load model diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java index b3e5c04759..7bb9993b84 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java @@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.regression.LinearRegressionModel; @@ -38,34 +37,24 @@ public class JavaRegressionMetricsExample { // Load and parse the data String path = "data/mllib/sample_linear_regression_data.txt"; JavaRDD data = sc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public LabeledPoint call(String line) { - String[] parts = line.split(" "); - double[] v = new double[parts.length - 1]; - for (int i = 1; i < parts.length - 1; i++) { - v[i - 1] = Double.parseDouble(parts[i].split(":")[1]); - } - return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); - } + JavaRDD parsedData = data.map(line -> { + String[] parts = line.split(" "); + double[] v = new double[parts.length - 1]; + for (int i = 1; i < parts.length - 1; i++) { + v[i - 1] = Double.parseDouble(parts[i].split(":")[1]); } - ); + return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); + }); parsedData.cache(); // Building the model int numIterations = 100; - final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), + LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); // Evaluate model on training examples and compute training error - JavaRDD> valuesAndPreds = parsedData.map( - new Function>() { - public Tuple2 call(LabeledPoint point) { - double prediction = model.predict(point.features()); - return new Tuple2(prediction, point.label()); - } - } - ); + JavaPairRDD valuesAndPreds = parsedData.mapToPair(point -> + new Tuple2<>(model.predict(point.features()), point.label())); // Instantiate metrics object RegressionMetrics metrics = new RegressionMetrics(valuesAndPreds.rdd()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java index 720b167b2c..866a221fdb 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java @@ -24,7 +24,6 @@ import org.apache.spark.SparkContext; import scala.Tuple2; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.SVMModel; import org.apache.spark.mllib.classification.SVMWithSGD; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; @@ -50,20 +49,14 @@ public class JavaSVMWithSGDExample { // Run training algorithm to build the model. int numIterations = 100; - final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations); + SVMModel model = SVMWithSGD.train(training.rdd(), numIterations); // Clear the default threshold. model.clearThreshold(); // Compute raw scores on the test set. - JavaRDD> scoreAndLabels = test.map( - new Function>() { - public Tuple2 call(LabeledPoint p) { - Double score = model.predict(p.features()); - return new Tuple2(score, p.label()); - } - } - ); + JavaRDD> scoreAndLabels = test.map(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. BinaryClassificationMetrics metrics = diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java index 7f4fe60042..f9198e75c2 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java @@ -23,9 +23,6 @@ import java.util.List; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -// $example off$ -import org.apache.spark.api.java.function.Function; -// $example on$ import org.apache.spark.mllib.fpm.AssociationRules; import org.apache.spark.mllib.fpm.FPGrowth; import org.apache.spark.mllib.fpm.FPGrowthModel; @@ -42,14 +39,7 @@ public class JavaSimpleFPGrowth { // $example on$ JavaRDD data = sc.textFile("data/mllib/sample_fpgrowth.txt"); - JavaRDD> transactions = data.map( - new Function>() { - public List call(String line) { - String[] parts = line.split(" "); - return Arrays.asList(parts); - } - } - ); + JavaRDD> transactions = data.map(line -> Arrays.asList(line.split(" "))); FPGrowth fpg = new FPGrowth() .setMinSupport(0.2) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index cfaa577b51..4be702c2ba 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -17,10 +17,6 @@ package org.apache.spark.examples.mllib; - -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; // $example on$ import org.apache.spark.mllib.stat.test.BinarySample; import org.apache.spark.mllib.stat.test.StreamingTest; @@ -75,16 +71,12 @@ public class JavaStreamingTestExample { ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString()); // $example on$ - JavaDStream data = ssc.textFileStream(dataDir).map( - new Function() { - @Override - public BinarySample call(String line) { - String[] ts = line.split(","); - boolean label = Boolean.parseBoolean(ts[0]); - double value = Double.parseDouble(ts[1]); - return new BinarySample(label, value); - } - }); + JavaDStream data = ssc.textFileStream(dataDir).map(line -> { + String[] ts = line.split(","); + boolean label = Boolean.parseBoolean(ts[0]); + double value = Double.parseDouble(ts[1]); + return new BinarySample(label, value); + }); StreamingTest streamingTest = new StreamingTest() .setPeacePeriod(0) @@ -98,21 +90,11 @@ public class JavaStreamingTestExample { // Stop processing if test becomes significant or we time out timeoutCounter = numBatchesTimeout; - out.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - timeoutCounter -= 1; - - boolean anySignificant = !rdd.filter(new Function() { - @Override - public Boolean call(StreamingTestResult v) { - return v.pValue() < 0.05; - } - }).isEmpty(); - - if (timeoutCounter <= 0 || anySignificant) { - rdd.context().stop(); - } + out.foreachRDD(rdd -> { + timeoutCounter -= 1; + boolean anySignificant = !rdd.filter(v -> v.pValue() < 0.05).isEmpty(); + if (timeoutCounter <= 0 || anySignificant) { + rdd.context().stop(); } }); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index b687fae5a1..adb96dd8bf 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -139,11 +139,9 @@ public class JavaSQLDataSourceExample { // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile"); Dataset namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); - Dataset namesDS = namesDF.map(new MapFunction() { - public String call(Row row) { - return "Name: " + row.getString(0); - } - }, Encoders.STRING()); + Dataset namesDS = namesDF.map( + (MapFunction) row -> "Name: " + row.getString(0), + Encoders.STRING()); namesDS.show(); // +------------+ // | value| diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java index c5770d147a..8605852d08 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java @@ -227,12 +227,9 @@ public class JavaSparkSQLExample { // Encoders for most common types are provided in class Encoders Encoder integerEncoder = Encoders.INT(); Dataset primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); - Dataset transformedDS = primitiveDS.map(new MapFunction() { - @Override - public Integer call(Integer value) throws Exception { - return value + 1; - } - }, integerEncoder); + Dataset transformedDS = primitiveDS.map( + (MapFunction) value -> value + 1, + integerEncoder); transformedDS.collect(); // Returns [2, 3, 4] // DataFrames can be converted to a Dataset by providing a class. Mapping based on name @@ -255,15 +252,12 @@ public class JavaSparkSQLExample { JavaRDD peopleRDD = spark.read() .textFile("examples/src/main/resources/people.txt") .javaRDD() - .map(new Function() { - @Override - public Person call(String line) throws Exception { - String[] parts = line.split(","); - Person person = new Person(); - person.setName(parts[0]); - person.setAge(Integer.parseInt(parts[1].trim())); - return person; - } + .map(line -> { + String[] parts = line.split(","); + Person person = new Person(); + person.setName(parts[0]); + person.setAge(Integer.parseInt(parts[1].trim())); + return person; }); // Apply a schema to an RDD of JavaBeans to get a DataFrame @@ -276,12 +270,9 @@ public class JavaSparkSQLExample { // The columns of a row in the result can be accessed by field index Encoder stringEncoder = Encoders.STRING(); - Dataset teenagerNamesByIndexDF = teenagersDF.map(new MapFunction() { - @Override - public String call(Row row) throws Exception { - return "Name: " + row.getString(0); - } - }, stringEncoder); + Dataset teenagerNamesByIndexDF = teenagersDF.map( + (MapFunction) row -> "Name: " + row.getString(0), + stringEncoder); teenagerNamesByIndexDF.show(); // +------------+ // | value| @@ -290,12 +281,9 @@ public class JavaSparkSQLExample { // +------------+ // or by field name - Dataset teenagerNamesByFieldDF = teenagersDF.map(new MapFunction() { - @Override - public String call(Row row) throws Exception { - return "Name: " + row.getAs("name"); - } - }, stringEncoder); + Dataset teenagerNamesByFieldDF = teenagersDF.map( + (MapFunction) row -> "Name: " + row.getAs("name"), + stringEncoder); teenagerNamesByFieldDF.show(); // +------------+ // | value| @@ -324,12 +312,9 @@ public class JavaSparkSQLExample { StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows - JavaRDD rowRDD = peopleRDD.map(new Function() { - @Override - public Row call(String record) throws Exception { - String[] attributes = record.split(","); - return RowFactory.create(attributes[0], attributes[1].trim()); - } + JavaRDD rowRDD = peopleRDD.map((Function) record -> { + String[] attributes = record.split(","); + return RowFactory.create(attributes[0], attributes[1].trim()); }); // Apply the schema to the RDD @@ -343,12 +328,9 @@ public class JavaSparkSQLExample { // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name - Dataset namesDS = results.map(new MapFunction() { - @Override - public String call(Row row) throws Exception { - return "Name: " + row.getString(0); - } - }, Encoders.STRING()); + Dataset namesDS = results.map( + (MapFunction) row -> "Name: " + row.getString(0), + Encoders.STRING()); namesDS.show(); // +-------------+ // | value| diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java index 2fe1307d8e..47638565b1 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java @@ -90,12 +90,9 @@ public class JavaSparkHiveExample { Dataset sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // The items in DaraFrames are of type Row, which lets you to access each column by ordinal. - Dataset stringsDS = sqlDF.map(new MapFunction() { - @Override - public String call(Row row) throws Exception { - return "Key: " + row.get(0) + ", Value: " + row.get(1); - } - }, Encoders.STRING()); + Dataset stringsDS = sqlDF.map( + (MapFunction) row -> "Key: " + row.get(0) + ", Value: " + row.get(1), + Encoders.STRING()); stringsDS.show(); // +--------------------+ // | value| diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java index 0f45cfeca4..4e02719e04 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java @@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import java.util.Arrays; -import java.util.Iterator; /** * Consumes messages from one or more topics in Kafka and does wordcount. @@ -78,12 +77,9 @@ public final class JavaStructuredKafkaWordCount { .as(Encoders.STRING()); // Generate running word count - Dataset wordCounts = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }, Encoders.STRING()).groupBy("value").count(); + Dataset wordCounts = lines.flatMap( + (FlatMapFunction) x -> Arrays.asList(x.split(" ")).iterator(), + Encoders.STRING()).groupBy("value").count(); // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index 5f342e1ead..3af786978b 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -21,7 +21,6 @@ import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; import java.util.Arrays; -import java.util.Iterator; /** * Counts words in UTF8 encoded, '\n' delimited text received from the network. @@ -61,13 +60,9 @@ public final class JavaStructuredNetworkWordCount { .load(); // Split the lines into words - Dataset words = lines.as(Encoders.STRING()) - .flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }, Encoders.STRING()); + Dataset words = lines.as(Encoders.STRING()).flatMap( + (FlatMapFunction) x -> Arrays.asList(x.split(" ")).iterator(), + Encoders.STRING()); // Generate running word count Dataset wordCounts = words.groupBy("value").count(); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java index 172d053c29..93ec5e2695 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java @@ -18,13 +18,11 @@ package org.apache.spark.examples.sql.streaming; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.functions; import org.apache.spark.sql.streaming.StreamingQuery; import scala.Tuple2; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; /** @@ -86,16 +84,12 @@ public final class JavaStructuredNetworkWordCountWindowed { // Split the lines into words, retaining timestamps Dataset words = lines .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) - .flatMap( - new FlatMapFunction, Tuple2>() { - @Override - public Iterator> call(Tuple2 t) { - List> result = new ArrayList<>(); - for (String word : t._1.split(" ")) { - result.add(new Tuple2<>(word, t._2)); - } - return result.iterator(); + .flatMap((FlatMapFunction, Tuple2>) t -> { + List> result = new ArrayList<>(); + for (String word : t._1.split(" ")) { + result.add(new Tuple2<>(word, t._2)); } + return result.iterator(); }, Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) ).toDF("word", "timestamp"); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index e20b94d5b0..47692ec982 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -20,9 +20,6 @@ package org.apache.spark.examples.streaming; import com.google.common.io.Closeables; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; @@ -38,7 +35,6 @@ import java.net.ConnectException; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Iterator; import java.util.regex.Pattern; /** @@ -74,23 +70,9 @@ public class JavaCustomReceiver extends Receiver { // words in input stream of \n delimited text (eg. generated by 'nc') JavaReceiverInputDStream lines = ssc.receiverStream( new JavaCustomReceiver(args[0], Integer.parseInt(args[1]))); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); ssc.start(); @@ -108,15 +90,13 @@ public class JavaCustomReceiver extends Receiver { port = port_; } + @Override public void onStart() { // Start the thread that receives data over a connection - new Thread() { - @Override public void run() { - receive(); - } - }.start(); + new Thread(this::receive).start(); } + @Override public void onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false @@ -127,13 +107,13 @@ public class JavaCustomReceiver extends Receiver { try { Socket socket = null; BufferedReader reader = null; - String userInput = null; try { // connect to the server socket = new Socket(host, port); reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); // Until stopped or connection broken continue reading + String userInput; while (!isStopped() && (userInput = reader.readLine()) != null) { System.out.println("Received data '" + userInput + "'"); store(userInput); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java index ed118f86c0..5e5ae6213d 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -20,7 +20,6 @@ package org.apache.spark.examples.streaming; import java.util.HashMap; import java.util.HashSet; import java.util.Arrays; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; @@ -30,7 +29,6 @@ import scala.Tuple2; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.Durations; @@ -82,31 +80,10 @@ public final class JavaDirectKafkaWordCount { ); // Get the lines, split them into words, count the words and print - JavaDStream lines = messages.map(new Function, String>() { - @Override - public String call(Tuple2 tuple2) { - return tuple2._2(); - } - }); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey( - new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaDStream lines = messages.map(Tuple2::_2); + JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); // Start the computation diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java index 33c0a2df2f..0c651049d0 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java @@ -18,7 +18,6 @@ package org.apache.spark.examples.streaming; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.flume.FlumeUtils; @@ -62,12 +61,7 @@ public final class JavaFlumeEventCount { flumeStream.count(); - flumeStream.count().map(new Function() { - @Override - public String call(Long in) { - return "Received " + in + " flume events."; - } - }).print(); + flumeStream.count().map(in -> "Received " + in + " flume events.").print(); ssc.start(); ssc.awaitTermination(); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java index 8a5fd53372..ce5acdca92 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -18,7 +18,6 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.Iterator; import java.util.Map; import java.util.HashMap; import java.util.regex.Pattern; @@ -26,10 +25,6 @@ import java.util.regex.Pattern; import scala.Tuple2; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -78,32 +73,12 @@ public final class JavaKafkaWordCount { JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); - JavaDStream lines = messages.map(new Function, String>() { - @Override - public String call(Tuple2 tuple2) { - return tuple2._2(); - } - }); + JavaDStream lines = messages.map(Tuple2::_2); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); + JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); jssc.start(); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java index 7a8fe99f48..b217672def 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -18,15 +18,11 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.Iterator; import java.util.regex.Pattern; import scala.Tuple2; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; @@ -66,24 +62,9 @@ public final class JavaNetworkWordCount { // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); ssc.start(); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java index 62413b4606..e86f8ab38a 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java @@ -17,19 +17,15 @@ package org.apache.spark.examples.streaming; - +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import scala.Tuple2; -import com.google.common.collect.Lists; - import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -49,14 +45,14 @@ public final class JavaQueueStream { // Create the queue through which RDDs can be pushed to // a QueueInputDStream - Queue> rddQueue = new LinkedList<>(); // Create and push some RDDs into the queue - List list = Lists.newArrayList(); + List list = new ArrayList<>(); for (int i = 0; i < 1000; i++) { list.add(i); } + Queue> rddQueue = new LinkedList<>(); for (int i = 0; i < 30; i++) { rddQueue.add(ssc.sparkContext().parallelize(list)); } @@ -64,19 +60,9 @@ public final class JavaQueueStream { // Create the QueueInputDStream and use it do some processing JavaDStream inputStream = ssc.queueStream(rddQueue); JavaPairDStream mappedStream = inputStream.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(Integer i) { - return new Tuple2<>(i % 10, 1); - } - }); + i -> new Tuple2<>(i % 10, 1)); JavaPairDStream reducedStream = mappedStream.reduceByKey( - new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + (i1, i2) -> i1 + i2); reducedStream.print(); ssc.start(); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index acbc345243..45a876decf 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -18,10 +18,8 @@ package org.apache.spark.examples.streaming; import java.io.File; -import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -30,12 +28,10 @@ import scala.Tuple2; import com.google.common.io.Files; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; -import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; @@ -120,7 +116,7 @@ public final class JavaRecoverableNetworkWordCount { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint System.out.println("Creating new context"); - final File outputFile = new File(outputPath); + File outputFile = new File(outputPath); if (outputFile.exists()) { outputFile.delete(); } @@ -132,52 +128,31 @@ public final class JavaRecoverableNetworkWordCount { // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') JavaReceiverInputDStream lines = ssc.socketTextStream(ip, port); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; + JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); + + wordCounts.foreachRDD((rdd, time) -> { + // Get or register the blacklist Broadcast + Broadcast> blacklist = + JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); + // Get or register the droppedWordsCounter Accumulator + LongAccumulator droppedWordsCounter = + JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + // Use blacklist to drop words and use droppedWordsCounter to count them + String counts = rdd.filter(wordCount -> { + if (blacklist.value().contains(wordCount._1())) { + droppedWordsCounter.add(wordCount._2()); + return false; + } else { + return true; } - }); - - wordCounts.foreachRDD(new VoidFunction2, Time>() { - @Override - public void call(JavaPairRDD rdd, Time time) throws IOException { - // Get or register the blacklist Broadcast - final Broadcast> blacklist = - JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); - // Get or register the droppedWordsCounter Accumulator - final LongAccumulator droppedWordsCounter = - JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); - // Use blacklist to drop words and use droppedWordsCounter to count them - String counts = rdd.filter(new Function, Boolean>() { - @Override - public Boolean call(Tuple2 wordCount) { - if (blacklist.value().contains(wordCount._1())) { - droppedWordsCounter.add(wordCount._2()); - return false; - } else { - return true; - } - } - }).collect().toString(); - String output = "Counts at time " + time + " " + counts; - System.out.println(output); - System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); - System.out.println("Appending to " + outputFile.getAbsolutePath()); - Files.append(output + "\n", outputFile, Charset.defaultCharset()); - } + }).collect().toString(); + String output = "Counts at time " + time + " " + counts; + System.out.println(output); + System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); + System.out.println("Appending to " + outputFile.getAbsolutePath()); + Files.append(output + "\n", outputFile, Charset.defaultCharset()); }); return ssc; @@ -198,19 +173,15 @@ public final class JavaRecoverableNetworkWordCount { System.exit(1); } - final String ip = args[0]; - final int port = Integer.parseInt(args[1]); - final String checkpointDirectory = args[2]; - final String outputPath = args[3]; + String ip = args[0]; + int port = Integer.parseInt(args[1]); + String checkpointDirectory = args[2]; + String outputPath = args[3]; // Function to create JavaStreamingContext without any output operations // (used to detect the new context) - Function0 createContextFunc = new Function0() { - @Override - public JavaStreamingContext call() { - return createContext(ip, port, checkpointDirectory, outputPath); - } - }; + Function0 createContextFunc = + () -> createContext(ip, port, checkpointDirectory, outputPath); JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java index b8e9e125ba..948d1a2111 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java @@ -18,20 +18,15 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.Iterator; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; -import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -48,7 +43,6 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999` */ - public final class JavaSqlNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); @@ -70,39 +64,28 @@ public final class JavaSqlNetworkWordCount { // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); + JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); // Convert RDDs of the words DStream to DataFrame and run SQL query - words.foreachRDD(new VoidFunction2, Time>() { - @Override - public void call(JavaRDD rdd, Time time) { - SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); + words.foreachRDD((rdd, time) -> { + SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); - // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame - JavaRDD rowRDD = rdd.map(new Function() { - @Override - public JavaRecord call(String word) { - JavaRecord record = new JavaRecord(); - record.setWord(word); - return record; - } - }); - Dataset wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); + // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame + JavaRDD rowRDD = rdd.map(word -> { + JavaRecord record = new JavaRecord(); + record.setWord(word); + return record; + }); + Dataset wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); - // Creates a temporary view using the DataFrame - wordsDataFrame.createOrReplaceTempView("words"); + // Creates a temporary view using the DataFrame + wordsDataFrame.createOrReplaceTempView("words"); - // Do word count on table using SQL and print it - Dataset wordCountsDataFrame = - spark.sql("select word, count(*) as total from words group by word"); - System.out.println("========= " + time + "========="); - wordCountsDataFrame.show(); - } + // Do word count on table using SQL and print it + Dataset wordCountsDataFrame = + spark.sql("select word, count(*) as total from words group by word"); + System.out.println("========= " + time + "========="); + wordCountsDataFrame.show(); }); ssc.start(); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index ed36df852a..9d8bd7fd11 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -18,7 +18,6 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -72,32 +71,17 @@ public class JavaStatefulNetworkWordCount { JavaReceiverInputDStream lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); + JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); - JavaPairDStream wordsDstream = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }); + JavaPairDStream wordsDstream = words.mapToPair(s -> new Tuple2<>(s, 1)); // Update the cumulative count function Function3, State, Tuple2> mappingFunc = - new Function3, State, Tuple2>() { - @Override - public Tuple2 call(String word, Optional one, - State state) { - int sum = one.orElse(0) + (state.exists() ? state.get() : 0); - Tuple2 output = new Tuple2<>(word, sum); - state.update(sum); - return output; - } + (word, one, state) -> { + int sum = one.orElse(0) + (state.exists() ? state.get() : 0); + Tuple2 output = new Tuple2<>(word, sum); + state.update(sum); + return output; }; // DStream made of get cumulative counts that get updated in every batch -- cgit v1.2.3