aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java/org
diff options
context:
space:
mode:
authorZheng RuiFeng <ruifengz@foxmail.com>2016-05-20 16:40:33 -0700
committerAndrew Or <andrew@databricks.com>2016-05-20 16:40:33 -0700
commit127bf1bb07967e2e4f99ad7abaa7f6fab3b3f407 (patch)
treea127031cd361df2f1d895cb11489f8e183c76f73 /examples/src/main/java/org
parent06c9f520714e07259c6f8ce6f9ea5a230a278cb5 (diff)
downloadspark-127bf1bb07967e2e4f99ad7abaa7f6fab3b3f407.tar.gz
spark-127bf1bb07967e2e4f99ad7abaa7f6fab3b3f407.tar.bz2
spark-127bf1bb07967e2e4f99ad7abaa7f6fab3b3f407.zip
[SPARK-15031][EXAMPLE] Use SparkSession in examples
## What changes were proposed in this pull request? Use `SparkSession` according to [SPARK-15031](https://issues.apache.org/jira/browse/SPARK-15031) `MLLLIB` is not recommended to use now, so examples in `MLLIB` are ignored in this PR. `StreamingContext` can not be directly obtained from `SparkSession`, so example in `Streaming` are ignored too. cc andrewor14 ## How was this patch tested? manual tests with spark-submit Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #13164 from zhengruifeng/use_sparksession_ii.
Diffstat (limited to 'examples/src/main/java/org')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java14
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java12
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java13
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java12
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java19
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaTC.java15
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java15
7 files changed, 62 insertions, 38 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index f64155ce3c..ded442096c 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -17,11 +17,10 @@
package org.apache.spark.examples;
-import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
import java.util.Arrays;
@@ -122,9 +121,12 @@ public final class JavaHdfsLR {
showWarning();
- SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
- JavaSparkContext sc = new JavaSparkContext(sparkConf);
- JavaRDD<String> lines = sc.textFile(args[0]);
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("JavaHdfsLR")
+ .getOrCreate();
+
+ JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[1]);
@@ -152,6 +154,6 @@ public final class JavaHdfsLR {
System.out.print("Final w: ");
printWeights(w);
- sc.stop();
+ spark.stop();
}
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index ebb0687b14..7775443861 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -20,12 +20,13 @@ package org.apache.spark.examples;
import com.google.common.collect.Lists;
import scala.Tuple2;
import scala.Tuple3;
-import org.apache.spark.SparkConf;
+
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
import java.util.List;
@@ -99,9 +100,12 @@ public final class JavaLogQuery {
}
public static void main(String[] args) {
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("JavaLogQuery")
+ .getOrCreate();
- SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery");
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);
@@ -123,6 +127,6 @@ public final class JavaLogQuery {
for (Tuple2<?,?> t : output) {
System.out.println(t._1() + "\t" + t._2());
}
- jsc.stop();
+ spark.stop();
}
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index 229d123441..128b5ab17c 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -26,14 +26,13 @@ import scala.Tuple2;
import com.google.common.collect.Iterables;
-import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.SparkSession;
/**
* Computes the PageRank of URLs from an input file. Input file should
@@ -73,15 +72,17 @@ public final class JavaPageRank {
showWarning();
- SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank");
- JavaSparkContext ctx = new JavaSparkContext(sparkConf);
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("JavaPageRank")
+ .getOrCreate();
// Loads in input file. It should be in format of:
// URL neighbor URL
// URL neighbor URL
// URL neighbor URL
// ...
- JavaRDD<String> lines = ctx.textFile(args[0], 1);
+ JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(
@@ -132,6 +133,6 @@ public final class JavaPageRank {
System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
}
- ctx.stop();
+ spark.stop();
}
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index 04a57a6bfb..7df145e311 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -17,11 +17,11 @@
package org.apache.spark.examples;
-import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
@@ -33,8 +33,12 @@ import java.util.List;
public final class JavaSparkPi {
public static void main(String[] args) throws Exception {
- SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("JavaSparkPi")
+ .getOrCreate();
+
+ JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
@@ -61,6 +65,6 @@ public final class JavaSparkPi {
System.out.println("Pi is roughly " + 4.0 * count / n);
- jsc.stop();
+ spark.stop();
}
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
index e68ec74c3e..6f899c772e 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java
@@ -17,13 +17,14 @@
package org.apache.spark.examples;
-import org.apache.spark.SparkConf;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.SparkStageInfo;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.SparkSession;
+
import java.util.Arrays;
import java.util.List;
@@ -44,11 +45,15 @@ public final class JavaStatusTrackerDemo {
}
public static void main(String[] args) throws Exception {
- SparkConf sparkConf = new SparkConf().setAppName(APP_NAME);
- final JavaSparkContext sc = new JavaSparkContext(sparkConf);
+ SparkSession spark = SparkSession
+ .builder()
+ .appName(APP_NAME)
+ .getOrCreate();
+
+ final JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Example of implementing a progress reporter for a simple job.
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map(
+ JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map(
new IdentityWithDelay<Integer>());
JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync();
while (!jobFuture.isDone()) {
@@ -58,13 +63,13 @@ public final class JavaStatusTrackerDemo {
continue;
}
int currentJobId = jobIds.get(jobIds.size() - 1);
- SparkJobInfo jobInfo = sc.statusTracker().getJobInfo(currentJobId);
- SparkStageInfo stageInfo = sc.statusTracker().getStageInfo(jobInfo.stageIds()[0]);
+ SparkJobInfo jobInfo = jsc.statusTracker().getJobInfo(currentJobId);
+ SparkStageInfo stageInfo = jsc.statusTracker().getStageInfo(jobInfo.stageIds()[0]);
System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() +
" active, " + stageInfo.numCompletedTasks() + " complete");
}
System.out.println("Job results are: " + jobFuture.get());
- sc.stop();
+ spark.stop();
}
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index ca10384212..f12ca77ed1 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -25,10 +25,10 @@ import java.util.Set;
import scala.Tuple2;
-import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.SparkSession;
/**
* Transitive closure on a graph, implemented in Java.
@@ -64,10 +64,15 @@ public final class JavaTC {
}
public static void main(String[] args) {
- SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
- JavaSparkContext sc = new JavaSparkContext(sparkConf);
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("JavaTC")
+ .getOrCreate();
+
+ JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+
Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
- JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
+ JavaPairRDD<Integer, Integer> tc = jsc.parallelizePairs(generateGraph(), slices).cache();
// Linear transitive closure: each round grows paths by one edge,
// by joining the graph's edges with the already-discovered paths.
@@ -94,6 +99,6 @@ public final class JavaTC {
} while (nextCount != oldCount);
System.out.println("TC has " + tc.count() + " edges.");
- sc.stop();
+ spark.stop();
}
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 3ff5412b93..1caee60e34 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -18,13 +18,13 @@
package org.apache.spark.examples;
import scala.Tuple2;
-import org.apache.spark.SparkConf;
+
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.Iterator;
@@ -41,9 +41,12 @@ public final class JavaWordCount {
System.exit(1);
}
- SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
- JavaSparkContext ctx = new JavaSparkContext(sparkConf);
- JavaRDD<String> lines = ctx.textFile(args[0], 1);
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("JavaWordCount")
+ .getOrCreate();
+
+ JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
@@ -72,6 +75,6 @@ public final class JavaWordCount {
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
- ctx.stop();
+ spark.stop();
}
}