aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-19 09:37:56 -0800
committerSean Owen <sowen@cloudera.com>2017-02-19 09:37:56 -0800
commitde14d35f77071932963a994fac5aec0e5df838a1 (patch)
tree09c2cc8248e664b46e9e3b57f41ccf377731e40a /examples
parentba8912e5f3d5c5a366cb3d1f6be91f2471d048d2 (diff)
downloadspark-de14d35f77071932963a994fac5aec0e5df838a1.tar.gz
spark-de14d35f77071932963a994fac5aec0e5df838a1.tar.bz2
spark-de14d35f77071932963a994fac5aec0e5df838a1.zip
[SPARK-19533][EXAMPLES] Convert Java tests to use lambdas, Java 8 features
## What changes were proposed in this pull request? Convert Java tests to use lambdas, Java 8 features. ## How was this patch tested? Jenkins tests. Author: Sean Owen <sowen@cloudera.com> Closes #16961 from srowen/SPARK-19533.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java21
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java49
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java20
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaTC.java8
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java27
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java7
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java3
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java13
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java7
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java33
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java7
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java38
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java26
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java33
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java27
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java19
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java21
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java30
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java39
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java19
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java23
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java28
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java47
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java14
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java13
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java19
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java23
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java37
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java135
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java58
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java31
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java13
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java12
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java40
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java8
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java60
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java9
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java10
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java11
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java16
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java34
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java31
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java8
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java33
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java25
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java24
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java91
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java51
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java30
52 files changed, 380 insertions, 1018 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 7775443861..cf12de390f 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -17,18 +17,16 @@
package org.apache.spark.examples;
-import com.google.common.collect.Lists;
import scala.Tuple2;
import scala.Tuple3;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -40,7 +38,7 @@ import java.util.regex.Pattern;
*/
public final class JavaLogQuery {
- public static final List<String> exampleApacheLogs = Lists.newArrayList(
+ public static final List<String> exampleApacheLogs = Arrays.asList(
"10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +
"HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " +
"Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " +
@@ -109,19 +107,10 @@ public final class JavaLogQuery {
JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);
- JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
- @Override
- public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
- return new Tuple2<>(extractKey(s), extractStats(s));
- }
- });
+ JavaPairRDD<Tuple3<String, String, String>, Stats> extracted =
+ dataSet.mapToPair(s -> new Tuple2<>(extractKey(s), extractStats(s)));
- JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
- @Override
- public Stats call(Stats stats, Stats stats2) {
- return stats.merge(stats2);
- }
- });
+ JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(Stats::merge);
List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
for (Tuple2<?,?> t : output) {
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index bcc493bdcb..b5b4703932 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -19,7 +19,6 @@ package org.apache.spark.examples;
import java.util.ArrayList;
import java.util.List;
-import java.util.Iterator;
import java.util.regex.Pattern;
import scala.Tuple2;
@@ -28,10 +27,7 @@ import com.google.common.collect.Iterables;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
/**
@@ -90,52 +86,35 @@ public final class JavaPageRank {
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
// Loads all URLs from input file and initialize their neighbors.
- JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(
- new PairFunction<String, String, String>() {
- @Override
- public Tuple2<String, String> call(String s) {
- String[] parts = SPACES.split(s);
- return new Tuple2<>(parts[0], parts[1]);
- }
- }).distinct().groupByKey().cache();
+ JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(s -> {
+ String[] parts = SPACES.split(s);
+ return new Tuple2<>(parts[0], parts[1]);
+ }).distinct().groupByKey().cache();
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
- JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() {
- @Override
- public Double call(Iterable<String> rs) {
- return 1.0;
- }
- });
+ JavaPairRDD<String, Double> ranks = links.mapValues(rs -> 1.0);
// Calculates and updates URL ranks continuously using PageRank algorithm.
for (int current = 0; current < Integer.parseInt(args[1]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
- .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
- @Override
- public Iterator<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
- int urlCount = Iterables.size(s._1);
- List<Tuple2<String, Double>> results = new ArrayList<>();
- for (String n : s._1) {
- results.add(new Tuple2<>(n, s._2() / urlCount));
- }
- return results.iterator();
+ .flatMapToPair(s -> {
+ int urlCount = Iterables.size(s._1());
+ List<Tuple2<String, Double>> results = new ArrayList<>();
+ for (String n : s._1) {
+ results.add(new Tuple2<>(n, s._2() / urlCount));
}
- });
+ return results.iterator();
+ });
// Re-calculates URL ranks based on neighbor contributions.
- ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() {
- @Override
- public Double call(Double sum) {
- return 0.15 + sum * 0.85;
- }
- });
+ ranks = contribs.reduceByKey(new Sum()).mapValues(sum -> 0.15 + sum * 0.85);
}
// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2<?,?> tuple : output) {
- System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
+ System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
}
spark.stop();
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index 89855e81f1..cb4b265690 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -19,8 +19,6 @@ package org.apache.spark.examples;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
@@ -49,19 +47,11 @@ public final class JavaSparkPi {
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
- int count = dataSet.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer integer) {
- double x = Math.random() * 2 - 1;
- double y = Math.random() * 2 - 1;
- return (x * x + y * y <= 1) ? 1 : 0;
- }
- }).reduce(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer integer, Integer integer2) {
- return integer + integer2;
- }
- });
+ int count = dataSet.map(integer -> {
+ double x = Math.random() * 2 - 1;
+ double y = Math.random() * 2 - 1;
+ return (x * x + y * y <= 1) ? 1 : 0;
+ }).reduce((integer, integer2) -> integer + integer2);
System.out.println("Pi is roughly " + 4.0 * count / n);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
index 6f899c772e..b0ebedfed6 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
@@ -25,7 +25,6 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
-
import java.util.Arrays;
import java.util.List;
@@ -50,11 +49,11 @@ public final class JavaStatusTrackerDemo {
.appName(APP_NAME)
.getOrCreate();
- final JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+ JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Example of implementing a progress reporter for a simple job.
JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map(
- new IdentityWithDelay<Integer>());
+ new IdentityWithDelay<>());
JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync();
while (!jobFuture.isDone()) {
Thread.sleep(1000); // 1 second
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index f12ca77ed1..bde30b84d6 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -80,13 +80,7 @@ public final class JavaTC {
// the graph to obtain the path (x, z).
// Because join() joins on keys, the edges are stored in reversed order.
- JavaPairRDD<Integer, Integer> edges = tc.mapToPair(
- new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
- return new Tuple2<>(e._2(), e._1());
- }
- });
+ JavaPairRDD<Integer, Integer> edges = tc.mapToPair(e -> new Tuple2<>(e._2(), e._1()));
long oldCount;
long nextCount = tc.count();
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 8f18604c07..f1ce1e9585 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -21,13 +21,9 @@ import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -48,28 +44,11 @@ public final class JavaWordCount {
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
- JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) {
- return Arrays.asList(SPACE.split(s)).iterator();
- }
- });
+ JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
- JavaPairRDD<String, Integer> ones = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- });
+ JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
- JavaPairRDD<String, Integer> counts = ones.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
+ JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
index 739558e81f..33ba668b32 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
@@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
@@ -88,11 +87,7 @@ public class JavaALSExample {
// $example on$
JavaRDD<Rating> ratingsRDD = spark
.read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
- .map(new Function<String, Rating>() {
- public Rating call(String str) {
- return Rating.parseRating(str);
- }
- });
+ .map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java
index 0f96293f03..9a4722b90c 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java
@@ -32,9 +32,6 @@ import org.apache.spark.sql.SparkSession;
/**
* Java example demonstrating model selection using TrainValidationSplit.
*
- * The example is based on {@link org.apache.spark.examples.ml.JavaSimpleParamsExample}
- * using linear regression.
- *
* Run with
* {{{
* bin/run-example ml.JavaModelSelectionViaTrainValidationSplitExample
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
index 004e9b12f6..3f809eba7f 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
@@ -69,20 +69,17 @@ public class JavaTokenizerExample {
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
- spark.udf().register("countTokens", new UDF1<WrappedArray<String>, Integer>() {
- @Override
- public Integer call(WrappedArray<String> words) {
- return words.size();
- }
- }, DataTypes.IntegerType);
+ spark.udf().register("countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType);
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
- .withColumn("tokens", callUDF("countTokens", col("words"))).show(false);
+ .withColumn("tokens", callUDF("countTokens", col("words")))
+ .show(false);
Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
- .withColumn("tokens", callUDF("countTokens", col("words"))).show(false);
+ .withColumn("tokens", callUDF("countTokens", col("words")))
+ .show(false);
// $example off$
spark.stop();
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java
index 1922514c87..1ae48be266 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java
@@ -20,10 +20,9 @@ package org.apache.spark.examples.ml;
import org.apache.spark.sql.SparkSession;
// $example on$
+import java.util.Arrays;
import java.util.List;
-import com.google.common.collect.Lists;
-
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
@@ -43,14 +42,14 @@ public class JavaVectorSlicerExample {
.getOrCreate();
// $example on$
- Attribute[] attrs = new Attribute[]{
+ Attribute[] attrs = {
NumericAttribute.defaultAttr().withName("f1"),
NumericAttribute.defaultAttr().withName("f2"),
NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);
- List<Row> data = Lists.newArrayList(
+ List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java
index 189560e3fe..5f43603f4f 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java
@@ -38,9 +38,9 @@ public class JavaAssociationRulesExample {
// $example on$
JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
- new FreqItemset<String>(new String[] {"a"}, 15L),
- new FreqItemset<String>(new String[] {"b"}, 35L),
- new FreqItemset<String>(new String[] {"a", "b"}, 12L)
+ new FreqItemset<>(new String[] {"a"}, 15L),
+ new FreqItemset<>(new String[] {"b"}, 35L),
+ new FreqItemset<>(new String[] {"a", "b"}, 12L)
));
AssociationRules arules = new AssociationRules()
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
index 12aa14f710..b9d0313c6b 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
import scala.Tuple2;
import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
@@ -46,7 +45,7 @@ public class JavaBinaryClassificationMetricsExample {
JavaRDD<LabeledPoint> test = splits[1];
// Run training algorithm to build the model.
- final LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
+ LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
.setNumClasses(2)
.run(training.rdd());
@@ -54,15 +53,8 @@ public class JavaBinaryClassificationMetricsExample {
model.clearThreshold();
// Compute raw scores on the test set.
- JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
- new Function<LabeledPoint, Tuple2<Object, Object>>() {
- @Override
- public Tuple2<Object, Object> call(LabeledPoint p) {
- Double prediction = model.predict(p.features());
- return new Tuple2<Object, Object>(prediction, p.label());
- }
- }
- );
+ JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
+ new Tuple2<>(model.predict(p.features()), p.label()));
// Get evaluation metrics.
BinaryClassificationMetrics metrics =
@@ -73,32 +65,25 @@ public class JavaBinaryClassificationMetricsExample {
System.out.println("Precision by threshold: " + precision.collect());
// Recall by threshold
- JavaRDD<Tuple2<Object, Object>> recall = metrics.recallByThreshold().toJavaRDD();
+ JavaRDD<?> recall = metrics.recallByThreshold().toJavaRDD();
System.out.println("Recall by threshold: " + recall.collect());
// F Score by threshold
- JavaRDD<Tuple2<Object, Object>> f1Score = metrics.fMeasureByThreshold().toJavaRDD();
+ JavaRDD<?> f1Score = metrics.fMeasureByThreshold().toJavaRDD();
System.out.println("F1 Score by threshold: " + f1Score.collect());
- JavaRDD<Tuple2<Object, Object>> f2Score = metrics.fMeasureByThreshold(2.0).toJavaRDD();
+ JavaRDD<?> f2Score = metrics.fMeasureByThreshold(2.0).toJavaRDD();
System.out.println("F2 Score by threshold: " + f2Score.collect());
// Precision-recall curve
- JavaRDD<Tuple2<Object, Object>> prc = metrics.pr().toJavaRDD();
+ JavaRDD<?> prc = metrics.pr().toJavaRDD();
System.out.println("Precision-recall curve: " + prc.collect());
// Thresholds
- JavaRDD<Double> thresholds = precision.map(
- new Function<Tuple2<Object, Object>, Double>() {
- @Override
- public Double call(Tuple2<Object, Object> t) {
- return new Double(t._1().toString());
- }
- }
- );
+ JavaRDD<Double> thresholds = precision.map(t -> Double.parseDouble(t._1().toString()));
// ROC Curve
- JavaRDD<Tuple2<Object, Object>> roc = metrics.roc().toJavaRDD();
+ JavaRDD<?> roc = metrics.roc().toJavaRDD();
System.out.println("ROC curve: " + roc.collect());
// AUPRC
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java
index c600094947..f878b55a98 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java
@@ -17,10 +17,9 @@
package org.apache.spark.examples.mllib;
-import java.util.ArrayList;
-
// $example on$
-import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.List;
// $example off$
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
@@ -41,7 +40,7 @@ public class JavaBisectingKMeansExample {
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// $example on$
- ArrayList<Vector> localData = Lists.newArrayList(
+ List<Vector> localData = Arrays.asList(
Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3),
Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java
index ad44acb4cd..ce354af2b5 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java
@@ -19,10 +19,8 @@ package org.apache.spark.examples.mllib;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
// $example on$
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
@@ -42,41 +40,25 @@ public class JavaChiSqSelectorExample {
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Although features are doubles, the ChiSqSelector treats each unique value as a category
- JavaRDD<LabeledPoint> discretizedData = points.map(
- new Function<LabeledPoint, LabeledPoint>() {
- @Override
- public LabeledPoint call(LabeledPoint lp) {
- final double[] discretizedFeatures = new double[lp.features().size()];
- for (int i = 0; i < lp.features().size(); ++i) {
- discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
- }
- return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
- }
+ JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
+ double[] discretizedFeatures = new double[lp.features().size()];
+ for (int i = 0; i < lp.features().size(); ++i) {
+ discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
}
- );
+ return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
+ });
// Create ChiSqSelector that will select top 50 of 692 features
ChiSqSelector selector = new ChiSqSelector(50);
// Create ChiSqSelector model (selecting features)
- final ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
+ ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// Filter the top 50 features from each feature vector
- JavaRDD<LabeledPoint> filteredData = discretizedData.map(
- new Function<LabeledPoint, LabeledPoint>() {
- @Override
- public LabeledPoint call(LabeledPoint lp) {
- return new LabeledPoint(lp.label(), transformer.transform(lp.features()));
- }
- }
- );
+ JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
+ new LabeledPoint(lp.label(), transformer.transform(lp.features())));
// $example off$
System.out.println("filtered data: ");
- filteredData.foreach(new VoidFunction<LabeledPoint>() {
- @Override
- public void call(LabeledPoint labeledPoint) throws Exception {
- System.out.println(labeledPoint.toString());
- }
- });
+ filteredData.foreach(System.out::println);
jsc.stop();
}
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java
index 66387b9df5..032c168b94 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java
@@ -27,8 +27,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
@@ -53,31 +51,21 @@ class JavaDecisionTreeClassificationExample {
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
- Integer numClasses = 2;
+ int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "gini";
- Integer maxDepth = 5;
- Integer maxBins = 32;
+ int maxDepth = 5;
+ int maxBins = 32;
// Train a DecisionTree model for classification.
- final DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
+ DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, impurity, maxDepth, maxBins);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
- testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
- @Override
- public Tuple2<Double, Double> call(LabeledPoint p) {
- return new Tuple2<>(model.predict(p.features()), p.label());
- }
- });
- Double testErr =
- 1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
- @Override
- public Boolean call(Tuple2<Double, Double> pl) {
- return !pl._1().equals(pl._2());
- }
- }).count() / testData.count();
+ testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+ double testErr =
+ predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification tree model:\n" + model.toDebugString());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java
index 904e7f7e95..f222c38fc8 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java
@@ -27,9 +27,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
@@ -56,34 +53,20 @@ class JavaDecisionTreeRegressionExample {
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "variance";
- Integer maxDepth = 5;
- Integer maxBins = 32;
+ int maxDepth = 5;
+ int maxBins = 32;
// Train a DecisionTree model.
- final DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
+ DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
categoricalFeaturesInfo, impurity, maxDepth, maxBins);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
- testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
- @Override
- public Tuple2<Double, Double> call(LabeledPoint p) {
- return new Tuple2<>(model.predict(p.features()), p.label());
- }
- });
- Double testMSE =
- predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() {
- @Override
- public Double call(Tuple2<Double, Double> pl) {
- Double diff = pl._1() - pl._2();
- return diff * diff;
- }
- }).reduce(new Function2<Double, Double, Double>() {
- @Override
- public Double call(Double a, Double b) {
- return a + b;
- }
- }) / data.count();
+ testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+ double testMSE = predictionAndLabel.mapToDouble(pl -> {
+ double diff = pl._1() - pl._2();
+ return diff * diff;
+ }).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression tree model:\n" + model.toDebugString());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java
index c8ce6ab284..2d45c6166f 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java
@@ -25,12 +25,10 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
// $example on$
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// $example off$
-import org.apache.spark.api.java.function.VoidFunction;
public class JavaElementwiseProductExample {
public static void main(String[] args) {
@@ -43,35 +41,18 @@ public class JavaElementwiseProductExample {
JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
- final ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
+ ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
// Batch transform and per-row transform give the same results:
JavaRDD<Vector> transformedData = transformer.transform(data);
- JavaRDD<Vector> transformedData2 = data.map(
- new Function<Vector, Vector>() {
- @Override
- public Vector call(Vector v) {
- return transformer.transform(v);
- }
- }
- );
+ JavaRDD<Vector> transformedData2 = data.map(transformer::transform);
// $example off$
System.out.println("transformedData: ");
- transformedData.foreach(new VoidFunction<Vector>() {
- @Override
- public void call(Vector vector) throws Exception {
- System.out.println(vector.toString());
- }
- });
+ transformedData.foreach(System.out::println);
System.out.println("transformedData2: ");
- transformedData2.foreach(new VoidFunction<Vector>() {
- @Override
- public void call(Vector vector) throws Exception {
- System.out.println(vector.toString());
- }
- });
+ transformedData2.foreach(System.out::println);
jsc.stop();
}
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java
index 3124411c82..5792e5a71c 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java
@@ -22,7 +22,6 @@ import org.apache.spark.api.java.JavaSparkContext;
// $example on$
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
@@ -39,18 +38,14 @@ public class JavaGaussianMixtureExample {
// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
- JavaRDD<Vector> parsedData = data.map(
- new Function<String, Vector>() {
- public Vector call(String s) {
- String[] sarray = s.trim().split(" ");
- double[] values = new double[sarray.length];
- for (int i = 0; i < sarray.length; i++) {
- values[i] = Double.parseDouble(sarray[i]);
- }
- return Vectors.dense(values);
- }
+ JavaRDD<Vector> parsedData = data.map(s -> {
+ String[] sarray = s.trim().split(" ");
+ double[] values = new double[sarray.length];
+ for (int i = 0; i < sarray.length; i++) {
+ values[i] = Double.parseDouble(sarray[i]);
}
- );
+ return Vectors.dense(values);
+ });
parsedData.cache();
// Cluster the data into two classes using GaussianMixture
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java
index 213949e525..521ee96fbd 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java
@@ -27,8 +27,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.GradientBoostedTrees;
import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
@@ -61,24 +59,13 @@ public class JavaGradientBoostingClassificationExample {
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
- final GradientBoostedTreesModel model =
- GradientBoostedTrees.train(trainingData, boostingStrategy);
+ GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
- testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
- @Override
- public Tuple2<Double, Double> call(LabeledPoint p) {
- return new Tuple2<>(model.predict(p.features()), p.label());
- }
- });
- Double testErr =
- 1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
- @Override
- public Boolean call(Tuple2<Double, Double> pl) {
- return !pl._1().equals(pl._2());
- }
- }).count() / testData.count();
+ testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+ double testErr =
+ predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification GBT model:\n" + model.toDebugString());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java
index 78db442dbc..b345d19f59 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java
@@ -24,12 +24,9 @@ import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.GradientBoostedTrees;
import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
@@ -60,30 +57,15 @@ public class JavaGradientBoostingRegressionExample {
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
- final GradientBoostedTreesModel model =
- GradientBoostedTrees.train(trainingData, boostingStrategy);
+ GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
- testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
- @Override
- public Tuple2<Double, Double> call(LabeledPoint p) {
- return new Tuple2<>(model.predict(p.features()), p.label());
- }
- });
- Double testMSE =
- predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() {
- @Override
- public Double call(Tuple2<Double, Double> pl) {
- Double diff = pl._1() - pl._2();
- return diff * diff;
- }
- }).reduce(new Function2<Double, Double, Double>() {
- @Override
- public Double call(Double a, Double b) {
- return a + b;
- }
- }) / data.count();
+ testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+ double testMSE = predictionAndLabel.mapToDouble(pl -> {
+ double diff = pl._1() - pl._2();
+ return diff * diff;
+ }).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression GBT model:\n" + model.toDebugString());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java
index a30b5f1f73..adebafe4b8 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java
@@ -20,9 +20,6 @@ package org.apache.spark.examples.mllib;
import scala.Tuple2;
import scala.Tuple3;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
@@ -42,14 +39,8 @@ public class JavaIsotonicRegressionExample {
jsc.sc(), "data/mllib/sample_isotonic_regression_libsvm_data.txt").toJavaRDD();
// Create label, feature, weight tuples from input data with weight set to default value 1.0.
- JavaRDD<Tuple3<Double, Double, Double>> parsedData = data.map(
- new Function<LabeledPoint, Tuple3<Double, Double, Double>>() {
- public Tuple3<Double, Double, Double> call(LabeledPoint point) {
- return new Tuple3<>(new Double(point.label()),
- new Double(point.features().apply(0)), 1.0);
- }
- }
- );
+ JavaRDD<Tuple3<Double, Double, Double>> parsedData = data.map(point ->
+ new Tuple3<>(point.label(), point.features().apply(0), 1.0));
// Split data into training (60%) and test (40%) sets.
JavaRDD<Tuple3<Double, Double, Double>>[] splits =
@@ -59,29 +50,17 @@ public class JavaIsotonicRegressionExample {
// Create isotonic regression model from training data.
// Isotonic parameter defaults to true so it is only shown for demonstration
- final IsotonicRegressionModel model =
- new IsotonicRegression().setIsotonic(true).run(training);
+ IsotonicRegressionModel model = new IsotonicRegression().setIsotonic(true).run(training);
// Create tuples of predicted and real labels.
- JavaPairRDD<Double, Double> predictionAndLabel = test.mapToPair(
- new PairFunction<Tuple3<Double, Double, Double>, Double, Double>() {
- @Override
- public Tuple2<Double, Double> call(Tuple3<Double, Double, Double> point) {
- Double predictedLabel = model.predict(point._2());
- return new Tuple2<>(predictedLabel, point._1());
- }
- }
- );
+ JavaPairRDD<Double, Double> predictionAndLabel = test.mapToPair(point ->
+ new Tuple2<>(model.predict(point._2()), point._1()));
// Calculate mean squared error between predicted and real labels.
- Double meanSquaredError = new JavaDoubleRDD(predictionAndLabel.map(
- new Function<Tuple2<Double, Double>, Object>() {
- @Override
- public Object call(Tuple2<Double, Double> pl) {
- return Math.pow(pl._1() - pl._2(), 2);
- }
- }
- ).rdd()).mean();
+ double meanSquaredError = predictionAndLabel.mapToDouble(pl -> {
+ double diff = pl._1() - pl._2();
+ return diff * diff;
+ }).mean();
System.out.println("Mean Squared Error = " + meanSquaredError);
// Save and load model
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java
index 2d89c768fc..f17275617a 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java
@@ -22,7 +22,6 @@ import org.apache.spark.api.java.JavaSparkContext;
// $example on$
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
@@ -39,18 +38,14 @@ public class JavaKMeansExample {
// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
- JavaRDD<Vector> parsedData = data.map(
- new Function<String, Vector>() {
- public Vector call(String s) {
- String[] sarray = s.split(" ");
- double[] values = new double[sarray.length];
- for (int i = 0; i < sarray.length; i++) {
- values[i] = Double.parseDouble(sarray[i]);
- }
- return Vectors.dense(values);
- }
+ JavaRDD<Vector> parsedData = data.map(s -> {
+ String[] sarray = s.split(" ");
+ double[] values = new double[sarray.length];
+ for (int i = 0; i < sarray.length; i++) {
+ values[i] = Double.parseDouble(sarray[i]);
}
- );
+ return Vectors.dense(values);
+ });
parsedData.cache();
// Cluster the data into two classes using KMeans
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java
index f6f91f486f..3fdc03a92a 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import scala.Tuple2;
import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vector;
@@ -50,12 +49,8 @@ public class JavaLBFGSExample {
JavaRDD<LabeledPoint> test = data.subtract(trainingInit);
// Append 1 into the training data as intercept.
- JavaRDD<Tuple2<Object, Vector>> training = data.map(
- new Function<LabeledPoint, Tuple2<Object, Vector>>() {
- public Tuple2<Object, Vector> call(LabeledPoint p) {
- return new Tuple2<Object, Vector>(p.label(), MLUtils.appendBias(p.features()));
- }
- });
+ JavaPairRDD<Object, Vector> training = data.mapToPair(p ->
+ new Tuple2<>(p.label(), MLUtils.appendBias(p.features())));
training.cache();
// Run training algorithm to build the model.
@@ -77,7 +72,7 @@ public class JavaLBFGSExample {
Vector weightsWithIntercept = result._1();
double[] loss = result._2();
- final LogisticRegressionModel model = new LogisticRegressionModel(
+ LogisticRegressionModel model = new LogisticRegressionModel(
Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
(weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);
@@ -85,13 +80,8 @@ public class JavaLBFGSExample {
model.clearThreshold();
// Compute raw scores on the test set.
- JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(
- new Function<LabeledPoint, Tuple2<Object, Object>>() {
- public Tuple2<Object, Object> call(LabeledPoint p) {
- Double score = model.predict(p.features());
- return new Tuple2<Object, Object>(score, p.label());
- }
- });
+ JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p ->
+ new Tuple2<>(model.predict(p.features()), p.label()));
// Get evaluation metrics.
BinaryClassificationMetrics metrics =
@@ -99,8 +89,9 @@ public class JavaLBFGSExample {
double auROC = metrics.areaUnderROC();
System.out.println("Loss of each step in training process");
- for (double l : loss)
+ for (double l : loss) {
System.out.println(l);
+ }
System.out.println("Area under ROC = " + auROC);
// $example off$
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java
index 578564eeb2..887edf8c21 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java
@@ -25,7 +25,6 @@ import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
@@ -44,28 +43,17 @@ public class JavaLatentDirichletAllocationExample {
// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
- JavaRDD<Vector> parsedData = data.map(
- new Function<String, Vector>() {
- public Vector call(String s) {
- String[] sarray = s.trim().split(" ");
- double[] values = new double[sarray.length];
- for (int i = 0; i < sarray.length; i++) {
- values[i] = Double.parseDouble(sarray[i]);
- }
- return Vectors.dense(values);
- }
+ JavaRDD<Vector> parsedData = data.map(s -> {
+ String[] sarray = s.trim().split(" ");
+ double[] values = new double[sarray.length];
+ for (int i = 0; i < sarray.length; i++) {
+ values[i] = Double.parseDouble(sarray[i]);
}
- );
+ return Vectors.dense(values);
+ });
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
- JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
- new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
- public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
- return doc_id.swap();
- }
- }
- )
- );
+ JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();
// Cluster the documents into three topics using LDA
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java
index 9ca9a7847c..324a781c1a 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java
@@ -23,9 +23,8 @@ import org.apache.spark.api.java.JavaSparkContext;
// $example on$
import scala.Tuple2;
-import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.regression.LinearRegressionModel;
@@ -44,43 +43,31 @@ public class JavaLinearRegressionWithSGDExample {
// Load and parse the data
String path = "data/mllib/ridge-data/lpsa.data";
JavaRDD<String> data = sc.textFile(path);
- JavaRDD<LabeledPoint> parsedData = data.map(
- new Function<String, LabeledPoint>() {
- public LabeledPoint call(String line) {
- String[] parts = line.split(",");
- String[] features = parts[1].split(" ");
- double[] v = new double[features.length];
- for (int i = 0; i < features.length - 1; i++) {
- v[i] = Double.parseDouble(features[i]);
- }
- return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
- }
+ JavaRDD<LabeledPoint> parsedData = data.map(line -> {
+ String[] parts = line.split(",");
+ String[] features = parts[1].split(" ");
+ double[] v = new double[features.length];
+ for (int i = 0; i < features.length - 1; i++) {
+ v[i] = Double.parseDouble(features[i]);
}
- );
+ return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
+ });
parsedData.cache();
// Building the model
int numIterations = 100;
double stepSize = 0.00000001;
- final LinearRegressionModel model =
+ LinearRegressionModel model =
LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize);
// Evaluate model on training examples and compute training error
- JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map(
- new Function<LabeledPoint, Tuple2<Double, Double>>() {
- public Tuple2<Double, Double> call(LabeledPoint point) {
- double prediction = model.predict(point.features());
- return new Tuple2<>(prediction, point.label());
- }
- }
- );
- double MSE = new JavaDoubleRDD(valuesAndPreds.map(
- new Function<Tuple2<Double, Double>, Object>() {
- public Object call(Tuple2<Double, Double> pair) {
- return Math.pow(pair._1() - pair._2(), 2.0);
- }
- }
- ).rdd()).mean();
+ JavaPairRDD<Double, Double> valuesAndPreds = parsedData.mapToPair(point ->
+ new Tuple2<>(model.predict(point.features()), point.label()));
+
+ double MSE = valuesAndPreds.mapToDouble(pair -> {
+ double diff = pair._1() - pair._2();
+ return diff * diff;
+ }).mean();
System.out.println("training Mean Squared Error = " + MSE);
// Save and load model
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java
index 7fc371ec0f..26b8a6e9fa 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java
@@ -23,8 +23,8 @@ import org.apache.spark.SparkContext;
// $example on$
import scala.Tuple2;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
@@ -49,19 +49,13 @@ public class JavaLogisticRegressionWithLBFGSExample {
JavaRDD<LabeledPoint> test = splits[1];
// Run training algorithm to build the model.
- final LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
+ LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training.rdd());
// Compute raw scores on the test set.
- JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
- new Function<LabeledPoint, Tuple2<Object, Object>>() {
- public Tuple2<Object, Object> call(LabeledPoint p) {
- Double prediction = model.predict(p.features());
- return new Tuple2<Object, Object>(prediction, p.label());
- }
- }
- );
+ JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
+ new Tuple2<>(model.predict(p.features()), p.label()));
// Get evaluation metrics.
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java
index 2d12bdd2a6..03670383b7 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
import scala.Tuple2;
import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
@@ -46,19 +45,13 @@ public class JavaMulticlassClassificationMetricsExample {
JavaRDD<LabeledPoint> test = splits[1];
// Run training algorithm to build the model.
- final LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
+ LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
.setNumClasses(3)
.run(training.rdd());
// Compute raw scores on the test set.
- JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
- new Function<LabeledPoint, Tuple2<Object, Object>>() {
- public Tuple2<Object, Object> call(LabeledPoint p) {
- Double prediction = model.predict(p.features());
- return new Tuple2<Object, Object>(prediction, p.label());
- }
- }
- );
+ JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
+ new Tuple2<>(model.predict(p.features()), p.label()));
// Get evaluation metrics.
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java
index f4ec04b0c6..d80dbe8000 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java
@@ -19,8 +19,6 @@ package org.apache.spark.examples.mllib;
// $example on$
import scala.Tuple2;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -41,20 +39,11 @@ public class JavaNaiveBayesExample {
JavaRDD<LabeledPoint>[] tmp = inputData.randomSplit(new double[]{0.6, 0.4});
JavaRDD<LabeledPoint> training = tmp[0]; // training set
JavaRDD<LabeledPoint> test = tmp[1]; // test set
- final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);
+ NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);
JavaPairRDD<Double, Double> predictionAndLabel =
- test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
- @Override
- public Tuple2<Double, Double> call(LabeledPoint p) {
- return new Tuple2<>(model.predict(p.features()), p.label());
- }
- });
- double accuracy = predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
- @Override
- public Boolean call(Tuple2<Double, Double> pl) {
- return pl._1().equals(pl._2());
- }
- }).count() / (double) test.count();
+ test.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+ double accuracy =
+ predictionAndLabel.filter(pl -> pl._1().equals(pl._2())).count() / (double) test.count();
// Save and load model
model.save(jsc.sc(), "target/tmp/myNaiveBayesModel");
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
index 91c3bd72da..5155f182ba 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
@@ -17,9 +17,9 @@
package org.apache.spark.examples.mllib;
-import scala.Tuple3;
+import java.util.Arrays;
-import com.google.common.collect.Lists;
+import scala.Tuple3;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@@ -39,7 +39,7 @@ public class JavaPowerIterationClusteringExample {
@SuppressWarnings("unchecked")
// $example on$
- JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Lists.newArrayList(
+ JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
new Tuple3<>(0L, 1L, 0.9),
new Tuple3<>(1L, 2L, 0.9),
new Tuple3<>(2L, 3L, 0.9),
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java
index 24af5d0180..6998ce2156 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java
@@ -19,6 +19,7 @@ package org.apache.spark.examples.mllib;
// $example on$
import java.util.HashMap;
+import java.util.Map;
import scala.Tuple2;
@@ -26,8 +27,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.RandomForest;
import org.apache.spark.mllib.tree.model.RandomForestModel;
@@ -50,7 +49,7 @@ public class JavaRandomForestClassificationExample {
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Integer numClasses = 2;
- HashMap<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
+ Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
Integer numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "gini";
@@ -58,25 +57,15 @@ public class JavaRandomForestClassificationExample {
Integer maxBins = 32;
Integer seed = 12345;
- final RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses,
+ RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins,
seed);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
- testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
- @Override
- public Tuple2<Double, Double> call(LabeledPoint p) {
- return new Tuple2<>(model.predict(p.features()), p.label());
- }
- });
- Double testErr =
- 1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
- @Override
- public Boolean call(Tuple2<Double, Double> pl) {
- return !pl._1().equals(pl._2());
- }
- }).count() / testData.count();
+ testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+ double testErr =
+ predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification forest model:\n" + model.toDebugString());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java
index afa9045878..4a0f55f529 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java
@@ -23,12 +23,9 @@ import java.util.Map;
import scala.Tuple2;
-import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.RandomForest;
import org.apache.spark.mllib.tree.model.RandomForestModel;
@@ -52,37 +49,23 @@ public class JavaRandomForestRegressionExample {
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
- Integer numTrees = 3; // Use more in practice.
+ int numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "variance";
- Integer maxDepth = 4;
- Integer maxBins = 32;
- Integer seed = 12345;
+ int maxDepth = 4;
+ int maxBins = 32;
+ int seed = 12345;
// Train a RandomForest model.
- final RandomForestModel model = RandomForest.trainRegressor(trainingData,
+ RandomForestModel model = RandomForest.trainRegressor(trainingData,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
- testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
- @Override
- public Tuple2<Double, Double> call(LabeledPoint p) {
- return new Tuple2<>(model.predict(p.features()), p.label());
- }
- });
- Double testMSE =
- predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() {
- @Override
- public Double call(Tuple2<Double, Double> pl) {
- Double diff = pl._1() - pl._2();
- return diff * diff;
- }
- }).reduce(new Function2<Double, Double, Double>() {
- @Override
- public Double call(Double a, Double b) {
- return a + b;
- }
- }) / testData.count();
+ testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
+ double testMSE = predictionAndLabel.mapToDouble(pl -> {
+ double diff = pl._1() - pl._2();
+ return diff * diff;
+ }).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression forest model:\n" + model.toDebugString());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
index 54dfc404ca..bd49f059b2 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
@@ -23,7 +23,6 @@ import java.util.*;
import scala.Tuple2;
import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.evaluation.RegressionMetrics;
import org.apache.spark.mllib.evaluation.RankingMetrics;
import org.apache.spark.mllib.recommendation.ALS;
@@ -39,93 +38,61 @@ public class JavaRankingMetricsExample {
// $example on$
String path = "data/mllib/sample_movielens_data.txt";
JavaRDD<String> data = sc.textFile(path);
- JavaRDD<Rating> ratings = data.map(
- new Function<String, Rating>() {
- @Override
- public Rating call(String line) {
- String[] parts = line.split("::");
- return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double
- .parseDouble(parts[2]) - 2.5);
- }
- }
- );
+ JavaRDD<Rating> ratings = data.map(line -> {
+ String[] parts = line.split("::");
+ return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double
+ .parseDouble(parts[2]) - 2.5);
+ });
ratings.cache();
// Train an ALS model
- final MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 10, 10, 0.01);
+ MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 10, 10, 0.01);
// Get top 10 recommendations for every user and scale ratings from 0 to 1
JavaRDD<Tuple2<Object, Rating[]>> userRecs = model.recommendProductsForUsers(10).toJavaRDD();
- JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map(
- new Function<Tuple2<Object, Rating[]>, Tuple2<Object, Rating[]>>() {
- @Override
- public Tuple2<Object, Rating[]> call(Tuple2<Object, Rating[]> t) {
- Rating[] scaledRatings = new Rating[t._2().length];
- for (int i = 0; i < scaledRatings.length; i++) {
- double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0);
- scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating);
- }
- return new Tuple2<>(t._1(), scaledRatings);
+ JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map(t -> {
+ Rating[] scaledRatings = new Rating[t._2().length];
+ for (int i = 0; i < scaledRatings.length; i++) {
+ double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0);
+ scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating);
}
- }
- );
+ return new Tuple2<>(t._1(), scaledRatings);
+ });
JavaPairRDD<Object, Rating[]> userRecommended = JavaPairRDD.fromJavaRDD(userRecsScaled);
// Map ratings to 1 or 0, 1 indicating a movie that should be recommended
- JavaRDD<Rating> binarizedRatings = ratings.map(
- new Function<Rating, Rating>() {
- @Override
- public Rating call(Rating r) {
- double binaryRating;
- if (r.rating() > 0.0) {
- binaryRating = 1.0;
- } else {
- binaryRating = 0.0;
- }
- return new Rating(r.user(), r.product(), binaryRating);
+ JavaRDD<Rating> binarizedRatings = ratings.map(r -> {
+ double binaryRating;
+ if (r.rating() > 0.0) {
+ binaryRating = 1.0;
+ } else {
+ binaryRating = 0.0;
}
- }
- );
+ return new Rating(r.user(), r.product(), binaryRating);
+ });
// Group ratings by common user
- JavaPairRDD<Object, Iterable<Rating>> userMovies = binarizedRatings.groupBy(
- new Function<Rating, Object>() {
- @Override
- public Object call(Rating r) {
- return r.user();
- }
- }
- );
+ JavaPairRDD<Object, Iterable<Rating>> userMovies = binarizedRatings.groupBy(Rating::user);
// Get true relevant documents from all user ratings
- JavaPairRDD<Object, List<Integer>> userMoviesList = userMovies.mapValues(
- new Function<Iterable<Rating>, List<Integer>>() {
- @Override
- public List<Integer> call(Iterable<Rating> docs) {
- List<Integer> products = new ArrayList<>();
- for (Rating r : docs) {
- if (r.rating() > 0.0) {
- products.add(r.product());
- }
+ JavaPairRDD<Object, List<Integer>> userMoviesList = userMovies.mapValues(docs -> {
+ List<Integer> products = new ArrayList<>();
+ for (Rating r : docs) {
+ if (r.rating() > 0.0) {
+ products.add(r.product());
}
- return products;
}
- }
- );
+ return products;
+ });
// Extract the product id from each recommendation
- JavaPairRDD<Object, List<Integer>> userRecommendedList = userRecommended.mapValues(
- new Function<Rating[], List<Integer>>() {
- @Override
- public List<Integer> call(Rating[] docs) {
- List<Integer> products = new ArrayList<>();
- for (Rating r : docs) {
- products.add(r.product());
- }
- return products;
+ JavaPairRDD<Object, List<Integer>> userRecommendedList = userRecommended.mapValues(docs -> {
+ List<Integer> products = new ArrayList<>();
+ for (Rating r : docs) {
+ products.add(r.product());
}
- }
- );
+ return products;
+ });
JavaRDD<Tuple2<List<Integer>, List<Integer>>> relevantDocs = userMoviesList.join(
userRecommendedList).values();
@@ -143,33 +110,15 @@ public class JavaRankingMetricsExample {
System.out.format("Mean average precision = %f\n", metrics.meanAveragePrecision());
// Evaluate the model using numerical ratings and regression metrics
- JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
- new Function<Rating, Tuple2<Object, Object>>() {
- @Override
- public Tuple2<Object, Object> call(Rating r) {
- return new Tuple2<Object, Object>(r.user(), r.product());
- }
- }
- );
+ JavaRDD<Tuple2<Object, Object>> userProducts =
+ ratings.map(r -> new Tuple2<>(r.user(), r.product()));
+
JavaPairRDD<Tuple2<Integer, Integer>, Object> predictions = JavaPairRDD.fromJavaRDD(
- model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
- new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
- @Override
- public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
- return new Tuple2<Tuple2<Integer, Integer>, Object>(
- new Tuple2<>(r.user(), r.product()), r.rating());
- }
- }
- ));
+ model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(r ->
+ new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())));
JavaRDD<Tuple2<Object, Object>> ratesAndPreds =
- JavaPairRDD.fromJavaRDD(ratings.map(
- new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
- @Override
- public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
- return new Tuple2<Tuple2<Integer, Integer>, Object>(
- new Tuple2<>(r.user(), r.product()), r.rating());
- }
- }
+ JavaPairRDD.fromJavaRDD(ratings.map(r ->
+ new Tuple2<Tuple2<Integer, Integer>, Object>(new Tuple2<>(r.user(), r.product()), r.rating())
)).join(predictions).values();
// Create regression metrics object
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
index f69aa4b75a..1ee68da35e 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
import scala.Tuple2;
import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
@@ -37,15 +36,12 @@ public class JavaRecommendationExample {
// Load and parse the data
String path = "data/mllib/als/test.data";
JavaRDD<String> data = jsc.textFile(path);
- JavaRDD<Rating> ratings = data.map(
- new Function<String, Rating>() {
- public Rating call(String s) {
- String[] sarray = s.split(",");
- return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
- Double.parseDouble(sarray[2]));
- }
- }
- );
+ JavaRDD<Rating> ratings = data.map(s -> {
+ String[] sarray = s.split(",");
+ return new Rating(Integer.parseInt(sarray[0]),
+ Integer.parseInt(sarray[1]),
+ Double.parseDouble(sarray[2]));
+ });
// Build the recommendation model using ALS
int rank = 10;
@@ -53,37 +49,19 @@ public class JavaRecommendationExample {
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
// Evaluate the model on rating data
- JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
- new Function<Rating, Tuple2<Object, Object>>() {
- public Tuple2<Object, Object> call(Rating r) {
- return new Tuple2<Object, Object>(r.user(), r.product());
- }
- }
- );
+ JavaRDD<Tuple2<Object, Object>> userProducts =
+ ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
- model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
- new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
- public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
- return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
- }
- }
- ));
- JavaRDD<Tuple2<Double, Double>> ratesAndPreds =
- JavaPairRDD.fromJavaRDD(ratings.map(
- new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
- public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
- return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
- }
- }
- )).join(predictions).values();
- double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
- new Function<Tuple2<Double, Double>, Object>() {
- public Object call(Tuple2<Double, Double> pair) {
- Double err = pair._1() - pair._2();
- return err * err;
- }
- }
- ).rdd()).mean();
+ model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
+ .map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
+ );
+ JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
+ ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
+ .join(predictions).values();
+ double MSE = ratesAndPreds.mapToDouble(pair -> {
+ double err = pair._1() - pair._2();
+ return err * err;
+ }).mean();
System.out.println("Mean Squared Error = " + MSE);
// Save and load model
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
index b3e5c04759..7bb9993b84 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
import scala.Tuple2;
import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.regression.LinearRegressionModel;
@@ -38,34 +37,24 @@ public class JavaRegressionMetricsExample {
// Load and parse the data
String path = "data/mllib/sample_linear_regression_data.txt";
JavaRDD<String> data = sc.textFile(path);
- JavaRDD<LabeledPoint> parsedData = data.map(
- new Function<String, LabeledPoint>() {
- public LabeledPoint call(String line) {
- String[] parts = line.split(" ");
- double[] v = new double[parts.length - 1];
- for (int i = 1; i < parts.length - 1; i++) {
- v[i - 1] = Double.parseDouble(parts[i].split(":")[1]);
- }
- return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
- }
+ JavaRDD<LabeledPoint> parsedData = data.map(line -> {
+ String[] parts = line.split(" ");
+ double[] v = new double[parts.length - 1];
+ for (int i = 1; i < parts.length - 1; i++) {
+ v[i - 1] = Double.parseDouble(parts[i].split(":")[1]);
}
- );
+ return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
+ });
parsedData.cache();
// Building the model
int numIterations = 100;
- final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData),
+ LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData),
numIterations);
// Evaluate model on training examples and compute training error
- JavaRDD<Tuple2<Object, Object>> valuesAndPreds = parsedData.map(
- new Function<LabeledPoint, Tuple2<Object, Object>>() {
- public Tuple2<Object, Object> call(LabeledPoint point) {
- double prediction = model.predict(point.features());
- return new Tuple2<Object, Object>(prediction, point.label());
- }
- }
- );
+ JavaPairRDD<Object, Object> valuesAndPreds = parsedData.mapToPair(point ->
+ new Tuple2<>(model.predict(point.features()), point.label()));
// Instantiate metrics object
RegressionMetrics metrics = new RegressionMetrics(valuesAndPreds.rdd());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
index 720b167b2c..866a221fdb 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
@@ -24,7 +24,6 @@ import org.apache.spark.SparkContext;
import scala.Tuple2;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.SVMModel;
import org.apache.spark.mllib.classification.SVMWithSGD;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
@@ -50,20 +49,14 @@ public class JavaSVMWithSGDExample {
// Run training algorithm to build the model.
int numIterations = 100;
- final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);
+ SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);
// Clear the default threshold.
model.clearThreshold();
// Compute raw scores on the test set.
- JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(
- new Function<LabeledPoint, Tuple2<Object, Object>>() {
- public Tuple2<Object, Object> call(LabeledPoint p) {
- Double score = model.predict(p.features());
- return new Tuple2<Object, Object>(score, p.label());
- }
- }
- );
+ JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(p ->
+ new Tuple2<>(model.predict(p.features()), p.label()));
// Get evaluation metrics.
BinaryClassificationMetrics metrics =
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
index 7f4fe60042..f9198e75c2 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
@@ -23,9 +23,6 @@ import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-// $example off$
-import org.apache.spark.api.java.function.Function;
-// $example on$
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;
@@ -42,14 +39,7 @@ public class JavaSimpleFPGrowth {
// $example on$
JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");
- JavaRDD<List<String>> transactions = data.map(
- new Function<String, List<String>>() {
- public List<String> call(String line) {
- String[] parts = line.split(" ");
- return Arrays.asList(parts);
- }
- }
- );
+ JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));
FPGrowth fpg = new FPGrowth()
.setMinSupport(0.2)
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
index cfaa577b51..4be702c2ba 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -17,10 +17,6 @@
package org.apache.spark.examples.mllib;
-
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
// $example on$
import org.apache.spark.mllib.stat.test.BinarySample;
import org.apache.spark.mllib.stat.test.StreamingTest;
@@ -75,16 +71,12 @@ public class JavaStreamingTestExample {
ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString());
// $example on$
- JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
- new Function<String, BinarySample>() {
- @Override
- public BinarySample call(String line) {
- String[] ts = line.split(",");
- boolean label = Boolean.parseBoolean(ts[0]);
- double value = Double.parseDouble(ts[1]);
- return new BinarySample(label, value);
- }
- });
+ JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(line -> {
+ String[] ts = line.split(",");
+ boolean label = Boolean.parseBoolean(ts[0]);
+ double value = Double.parseDouble(ts[1]);
+ return new BinarySample(label, value);
+ });
StreamingTest streamingTest = new StreamingTest()
.setPeacePeriod(0)
@@ -98,21 +90,11 @@ public class JavaStreamingTestExample {
// Stop processing if test becomes significant or we time out
timeoutCounter = numBatchesTimeout;
- out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
- @Override
- public void call(JavaRDD<StreamingTestResult> rdd) {
- timeoutCounter -= 1;
-
- boolean anySignificant = !rdd.filter(new Function<StreamingTestResult, Boolean>() {
- @Override
- public Boolean call(StreamingTestResult v) {
- return v.pValue() < 0.05;
- }
- }).isEmpty();
-
- if (timeoutCounter <= 0 || anySignificant) {
- rdd.context().stop();
- }
+ out.foreachRDD(rdd -> {
+ timeoutCounter -= 1;
+ boolean anySignificant = !rdd.filter(v -> v.pValue() < 0.05).isEmpty();
+ if (timeoutCounter <= 0 || anySignificant) {
+ rdd.context().stop();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index b687fae5a1..adb96dd8bf 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -139,11 +139,9 @@ public class JavaSQLDataSourceExample {
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
- Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() {
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }, Encoders.STRING());
+ Dataset<String> namesDS = namesDF.map(
+ (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+ Encoders.STRING());
namesDS.show();
// +------------+
// | value|
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
index c5770d147a..8605852d08 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
@@ -227,12 +227,9 @@ public class JavaSparkSQLExample {
// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
- Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
- @Override
- public Integer call(Integer value) throws Exception {
- return value + 1;
- }
- }, integerEncoder);
+ Dataset<Integer> transformedDS = primitiveDS.map(
+ (MapFunction<Integer, Integer>) value -> value + 1,
+ integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
@@ -255,15 +252,12 @@ public class JavaSparkSQLExample {
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
- .map(new Function<String, Person>() {
- @Override
- public Person call(String line) throws Exception {
- String[] parts = line.split(",");
- Person person = new Person();
- person.setName(parts[0]);
- person.setAge(Integer.parseInt(parts[1].trim()));
- return person;
- }
+ .map(line -> {
+ String[] parts = line.split(",");
+ Person person = new Person();
+ person.setName(parts[0]);
+ person.setAge(Integer.parseInt(parts[1].trim()));
+ return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
@@ -276,12 +270,9 @@ public class JavaSparkSQLExample {
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
- Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() {
- @Override
- public String call(Row row) throws Exception {
- return "Name: " + row.getString(0);
- }
- }, stringEncoder);
+ Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
+ (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+ stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
@@ -290,12 +281,9 @@ public class JavaSparkSQLExample {
// +------------+
// or by field name
- Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() {
- @Override
- public String call(Row row) throws Exception {
- return "Name: " + row.<String>getAs("name");
- }
- }, stringEncoder);
+ Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
+ (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
+ stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
@@ -324,12 +312,9 @@ public class JavaSparkSQLExample {
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
- JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() {
- @Override
- public Row call(String record) throws Exception {
- String[] attributes = record.split(",");
- return RowFactory.create(attributes[0], attributes[1].trim());
- }
+ JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
+ String[] attributes = record.split(",");
+ return RowFactory.create(attributes[0], attributes[1].trim());
});
// Apply the schema to the RDD
@@ -343,12 +328,9 @@ public class JavaSparkSQLExample {
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
- Dataset<String> namesDS = results.map(new MapFunction<Row, String>() {
- @Override
- public String call(Row row) throws Exception {
- return "Name: " + row.getString(0);
- }
- }, Encoders.STRING());
+ Dataset<String> namesDS = results.map(
+ (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+ Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
index 2fe1307d8e..47638565b1 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
@@ -90,12 +90,9 @@ public class JavaSparkHiveExample {
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DaraFrames are of type Row, which lets you to access each column by ordinal.
- Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() {
- @Override
- public String call(Row row) throws Exception {
- return "Key: " + row.get(0) + ", Value: " + row.get(1);
- }
- }, Encoders.STRING());
+ Dataset<String> stringsDS = sqlDF.map(
+ (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
+ Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
index 0f45cfeca4..4e02719e04 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
@@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
-import java.util.Iterator;
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
@@ -78,12 +77,9 @@ public final class JavaStructuredKafkaWordCount {
.as(Encoders.STRING());
// Generate running word count
- Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- }, Encoders.STRING()).groupBy("value").count();
+ Dataset<Row> wordCounts = lines.flatMap(
+ (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
+ Encoders.STRING()).groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
index 5f342e1ead..3af786978b 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -21,7 +21,6 @@ import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
-import java.util.Iterator;
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network.
@@ -61,13 +60,9 @@ public final class JavaStructuredNetworkWordCount {
.load();
// Split the lines into words
- Dataset<String> words = lines.as(Encoders.STRING())
- .flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- }, Encoders.STRING());
+ Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
+ (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
+ Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
index 172d053c29..93ec5e2695 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
@@ -18,13 +18,11 @@ package org.apache.spark.examples.sql.streaming;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
-import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import scala.Tuple2;
import java.sql.Timestamp;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
/**
@@ -86,16 +84,12 @@ public final class JavaStructuredNetworkWordCountWindowed {
// Split the lines into words, retaining timestamps
Dataset<Row> words = lines
.as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
- .flatMap(
- new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
- @Override
- public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
- List<Tuple2<String, Timestamp>> result = new ArrayList<>();
- for (String word : t._1.split(" ")) {
- result.add(new Tuple2<>(word, t._2));
- }
- return result.iterator();
+ .flatMap((FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>) t -> {
+ List<Tuple2<String, Timestamp>> result = new ArrayList<>();
+ for (String word : t._1.split(" ")) {
+ result.add(new Tuple2<>(word, t._2));
}
+ return result.iterator();
},
Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
).toDF("word", "timestamp");
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index e20b94d5b0..47692ec982 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -20,9 +20,6 @@ package org.apache.spark.examples.streaming;
import com.google.common.io.Closeables;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -38,7 +35,6 @@ import java.net.ConnectException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.regex.Pattern;
/**
@@ -74,23 +70,9 @@ public class JavaCustomReceiver extends Receiver<String> {
// words in input stream of \n delimited text (eg. generated by 'nc')
JavaReceiverInputDStream<String> lines = ssc.receiverStream(
new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(SPACE.split(x)).iterator();
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
+ JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+ .reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
ssc.start();
@@ -108,15 +90,13 @@ public class JavaCustomReceiver extends Receiver<String> {
port = port_;
}
+ @Override
public void onStart() {
// Start the thread that receives data over a connection
- new Thread() {
- @Override public void run() {
- receive();
- }
- }.start();
+ new Thread(this::receive).start();
}
+ @Override
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
@@ -127,13 +107,13 @@ public class JavaCustomReceiver extends Receiver<String> {
try {
Socket socket = null;
BufferedReader reader = null;
- String userInput = null;
try {
// connect to the server
socket = new Socket(host, port);
reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// Until stopped or connection broken continue reading
+ String userInput;
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
store(userInput);
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index ed118f86c0..5e5ae6213d 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -20,7 +20,6 @@ package org.apache.spark.examples.streaming;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@@ -30,7 +29,6 @@ import scala.Tuple2;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
@@ -82,31 +80,10 @@ public final class JavaDirectKafkaWordCount {
);
// Get the lines, split them into words, count the words and print
- JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- });
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(SPACE.split(x)).iterator();
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
+ JavaDStream<String> lines = messages.map(Tuple2::_2);
+ JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+ .reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
// Start the computation
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
index 33c0a2df2f..0c651049d0 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -18,7 +18,6 @@
package org.apache.spark.examples.streaming;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
@@ -62,12 +61,7 @@ public final class JavaFlumeEventCount {
flumeStream.count();
- flumeStream.count().map(new Function<Long, String>() {
- @Override
- public String call(Long in) {
- return "Received " + in + " flume events.";
- }
- }).print();
+ flumeStream.count().map(in -> "Received " + in + " flume events.").print();
ssc.start();
ssc.awaitTermination();
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 8a5fd53372..ce5acdca92 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -18,7 +18,6 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;
@@ -26,10 +25,6 @@ import java.util.regex.Pattern;
import scala.Tuple2;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -78,32 +73,12 @@ public final class JavaKafkaWordCount {
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
- JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- });
+ JavaDStream<String> lines = messages.map(Tuple2::_2);
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(SPACE.split(x)).iterator();
- }
- });
+ JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+ .reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 7a8fe99f48..b217672def 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -18,15 +18,11 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.regex.Pattern;
import scala.Tuple2;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -66,24 +62,9 @@ public final class JavaNetworkWordCount {
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(SPACE.split(x)).iterator();
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
+ JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+ .reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
ssc.start();
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
index 62413b4606..e86f8ab38a 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
@@ -17,19 +17,15 @@
package org.apache.spark.examples.streaming;
-
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import scala.Tuple2;
-import com.google.common.collect.Lists;
-
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -49,14 +45,14 @@ public final class JavaQueueStream {
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
- Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
// Create and push some RDDs into the queue
- List<Integer> list = Lists.newArrayList();
+ List<Integer> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(i);
}
+ Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
for (int i = 0; i < 30; i++) {
rddQueue.add(ssc.sparkContext().parallelize(list));
}
@@ -64,19 +60,9 @@ public final class JavaQueueStream {
// Create the QueueInputDStream and use it do some processing
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i % 10, 1);
- }
- });
+ i -> new Tuple2<>(i % 10, 1));
JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
+ (i1, i2) -> i1 + i2);
reducedStream.print();
ssc.start();
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index acbc345243..45a876decf 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -18,10 +18,8 @@
package org.apache.spark.examples.streaming;
import java.io.File;
-import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -30,12 +28,10 @@ import scala.Tuple2;
import com.google.common.io.Files;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
@@ -120,7 +116,7 @@ public final class JavaRecoverableNetworkWordCount {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
System.out.println("Creating new context");
- final File outputFile = new File(outputPath);
+ File outputFile = new File(outputPath);
if (outputFile.exists()) {
outputFile.delete();
}
@@ -132,52 +128,31 @@ public final class JavaRecoverableNetworkWordCount {
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(SPACE.split(x)).iterator();
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
+ JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+ .reduceByKey((i1, i2) -> i1 + i2);
+
+ wordCounts.foreachRDD((rdd, time) -> {
+ // Get or register the blacklist Broadcast
+ Broadcast<List<String>> blacklist =
+ JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+ // Get or register the droppedWordsCounter Accumulator
+ LongAccumulator droppedWordsCounter =
+ JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ String counts = rdd.filter(wordCount -> {
+ if (blacklist.value().contains(wordCount._1())) {
+ droppedWordsCounter.add(wordCount._2());
+ return false;
+ } else {
+ return true;
}
- });
-
- wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
- @Override
- public void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
- // Get or register the blacklist Broadcast
- final Broadcast<List<String>> blacklist =
- JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
- // Get or register the droppedWordsCounter Accumulator
- final LongAccumulator droppedWordsCounter =
- JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
- // Use blacklist to drop words and use droppedWordsCounter to count them
- String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
- @Override
- public Boolean call(Tuple2<String, Integer> wordCount) {
- if (blacklist.value().contains(wordCount._1())) {
- droppedWordsCounter.add(wordCount._2());
- return false;
- } else {
- return true;
- }
- }
- }).collect().toString();
- String output = "Counts at time " + time + " " + counts;
- System.out.println(output);
- System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
- System.out.println("Appending to " + outputFile.getAbsolutePath());
- Files.append(output + "\n", outputFile, Charset.defaultCharset());
- }
+ }).collect().toString();
+ String output = "Counts at time " + time + " " + counts;
+ System.out.println(output);
+ System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
+ System.out.println("Appending to " + outputFile.getAbsolutePath());
+ Files.append(output + "\n", outputFile, Charset.defaultCharset());
});
return ssc;
@@ -198,19 +173,15 @@ public final class JavaRecoverableNetworkWordCount {
System.exit(1);
}
- final String ip = args[0];
- final int port = Integer.parseInt(args[1]);
- final String checkpointDirectory = args[2];
- final String outputPath = args[3];
+ String ip = args[0];
+ int port = Integer.parseInt(args[1]);
+ String checkpointDirectory = args[2];
+ String outputPath = args[3];
// Function to create JavaStreamingContext without any output operations
// (used to detect the new context)
- Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() {
- @Override
- public JavaStreamingContext call() {
- return createContext(ip, port, checkpointDirectory, outputPath);
- }
- };
+ Function0<JavaStreamingContext> createContextFunc =
+ () -> createContext(ip, port, checkpointDirectory, outputPath);
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index b8e9e125ba..948d1a2111 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -18,20 +18,15 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -48,7 +43,6 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999`
*/
-
public final class JavaSqlNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
@@ -70,39 +64,28 @@ public final class JavaSqlNetworkWordCount {
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(SPACE.split(x)).iterator();
- }
- });
+ JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
// Convert RDDs of the words DStream to DataFrame and run SQL query
- words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
- @Override
- public void call(JavaRDD<String> rdd, Time time) {
- SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
+ words.foreachRDD((rdd, time) -> {
+ SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
- // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
- JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() {
- @Override
- public JavaRecord call(String word) {
- JavaRecord record = new JavaRecord();
- record.setWord(word);
- return record;
- }
- });
- Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);
+ // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
+ JavaRDD<JavaRecord> rowRDD = rdd.map(word -> {
+ JavaRecord record = new JavaRecord();
+ record.setWord(word);
+ return record;
+ });
+ Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);
- // Creates a temporary view using the DataFrame
- wordsDataFrame.createOrReplaceTempView("words");
+ // Creates a temporary view using the DataFrame
+ wordsDataFrame.createOrReplaceTempView("words");
- // Do word count on table using SQL and print it
- Dataset<Row> wordCountsDataFrame =
- spark.sql("select word, count(*) as total from words group by word");
- System.out.println("========= " + time + "=========");
- wordCountsDataFrame.show();
- }
+ // Do word count on table using SQL and print it
+ Dataset<Row> wordCountsDataFrame =
+ spark.sql("select word, count(*) as total from words group by word");
+ System.out.println("========= " + time + "=========");
+ wordCountsDataFrame.show();
});
ssc.start();
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index ed36df852a..9d8bd7fd11 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -18,7 +18,6 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -72,32 +71,17 @@ public class JavaStatefulNetworkWordCount {
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(SPACE.split(x)).iterator();
- }
- });
+ JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
- JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- });
+ JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(s -> new Tuple2<>(s, 1));
// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
- new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> call(String word, Optional<Integer> one,
- State<Integer> state) {
- int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
- Tuple2<String, Integer> output = new Tuple2<>(word, sum);
- state.update(sum);
- return output;
- }
+ (word, one, state) -> {
+ int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
+ Tuple2<String, Integer> output = new Tuple2<>(word, sum);
+ state.update(sum);
+ return output;
};
// DStream made of get cumulative counts that get updated in every batch