aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
Diffstat (limited to 'docs')
-rw-r--r--docs/building-spark.md32
-rw-r--r--docs/index.md6
-rw-r--r--docs/mllib-linear-methods.md2
-rw-r--r--docs/mllib-statistics.md7
-rw-r--r--docs/programming-guide.md11
-rw-r--r--docs/quick-start.md9
-rw-r--r--docs/streaming-custom-receivers.md10
-rw-r--r--docs/streaming-kafka-0-10-integration.md62
-rw-r--r--docs/streaming-kafka-0-8-integration.md41
-rw-r--r--docs/streaming-programming-guide.md219
-rw-r--r--docs/structured-streaming-programming-guide.md38
11 files changed, 136 insertions, 301 deletions
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 690c656bad..56b892696e 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -12,8 +12,8 @@ redirect_from: "building-with-maven.html"
## Apache Maven
The Maven-based build is the build of reference for Apache Spark.
-Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+.
-Note that support for Java 7 is deprecated as of Spark 2.0.0 and may be removed in Spark 2.2.0.
+Building Spark using Maven requires Maven 3.3.9 or newer and Java 8+.
+Note that support for Java 7 was removed as of Spark 2.2.0.
### Setting up Maven's Memory Usage
@@ -21,28 +21,18 @@ You'll need to configure Maven to use more memory than usual by setting `MAVEN_O
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
-When compiling with Java 7, you will need to add the additional option "-XX:MaxPermSize=512M" to MAVEN_OPTS.
-
+(The `ReservedCodeCacheSize` setting is optional but recommended.)
If you don't add these parameters to `MAVEN_OPTS`, you may see errors and warnings like the following:
[INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
- [ERROR] PermGen space -> [Help 1]
-
- [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
[ERROR] Java heap space -> [Help 1]
- [INFO] Compiling 233 Scala sources and 41 Java sources to /Users/me/Development/spark/sql/core/target/scala-{site.SCALA_BINARY_VERSION}/classes...
- OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
- OpenJDK 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
-
You can fix these problems by setting the `MAVEN_OPTS` variable as discussed before.
**Note:**
* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automatically add the above options to the `MAVEN_OPTS` environment variable.
-* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`.
-* You may see warnings like "ignoring option MaxPermSize=1g; support was removed in 8.0" when building or running tests with Java 8 and `build/mvn`. These warnings are harmless.
-
+* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`.
### build/mvn
@@ -224,20 +214,6 @@ To run test suites of a specific sub project as follows:
./build/sbt core/test
-## Running Java 8 Test Suites
-
-Running only Java 8 tests and nothing else.
-
- ./build/mvn install -DskipTests
- ./build/mvn -pl :java8-tests_2.11 test
-
-or
-
- ./build/sbt java8-tests/test
-
-Java 8 tests are automatically enabled when a Java 8 JDK is detected.
-If you have JDK 8 installed but it is not the system default, you can set JAVA_HOME to point to JDK 8 before running the tests.
-
## PySpark pip installable
If you are building Spark for use in a Python environment and you wish to pip install it, you will first need to build the Spark JARs as described above. Then you can construct an sdist package suitable for setup.py and pip installable package.
diff --git a/docs/index.md b/docs/index.md
index 023e06ada3..19a9d3bfc6 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -26,11 +26,13 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy
locally on one machine --- all you need is to have `java` installed on your system `PATH`,
or the `JAVA_HOME` environment variable pointing to a Java installation.
-Spark runs on Java 7+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}}
+Spark runs on Java 8+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}}
uses Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version
({{site.SCALA_BINARY_VERSION}}.x).
-Note that support for Java 7 and Python 2.6 are deprecated as of Spark 2.0.0, and support for
+Note that support for Java 7 was removed as of Spark 2.2.0.
+
+Note that support for Python 2.6 is deprecated as of Spark 2.0.0, and support for
Scala 2.10 and versions of Hadoop before 2.6 are deprecated as of Spark 2.1.0, and may be
removed in Spark 2.2.0.
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md
index 3085539b40..034e89e250 100644
--- a/docs/mllib-linear-methods.md
+++ b/docs/mllib-linear-methods.md
@@ -222,7 +222,7 @@ svmAlg.optimizer()
.setNumIterations(200)
.setRegParam(0.1)
.setUpdater(new L1Updater());
-final SVMModel modelL1 = svmAlg.run(training.rdd());
+SVMModel modelL1 = svmAlg.run(training.rdd());
{% endhighlight %}
In order to run the above application, follow the instructions
diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md
index 430c069045..c29400af85 100644
--- a/docs/mllib-statistics.md
+++ b/docs/mllib-statistics.md
@@ -317,12 +317,7 @@ JavaSparkContext jsc = ...
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
JavaDoubleRDD u = normalJavaRDD(jsc, 1000000L, 10);
// Apply a transform to get a random double RDD following `N(1, 4)`.
-JavaDoubleRDD v = u.map(
- new Function<Double, Double>() {
- public Double call(Double x) {
- return 1.0 + 2.0 * x;
- }
- });
+JavaDoubleRDD v = u.mapToDouble(x -> 1.0 + 2.0 * x);
{% endhighlight %}
</div>
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index db8b048fce..6740dbe001 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -54,12 +54,12 @@ import org.apache.spark.SparkConf
<div data-lang="java" markdown="1">
-Spark {{site.SPARK_VERSION}} works with Java 7 and higher. If you are using Java 8, Spark supports
+Spark {{site.SPARK_VERSION}} supports
[lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
for concisely writing functions, otherwise you can use the classes in the
[org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package.
-Note that support for Java 7 is deprecated as of Spark 2.0.0 and may be removed in Spark 2.2.0.
+Note that support for Java 7 was removed in Spark 2.2.0.
To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at:
@@ -295,11 +295,6 @@ JavaRDD<Integer> distData = sc.parallelize(data);
Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we might call `distData.reduce((a, b) -> a + b)` to add up the elements of the list.
We describe operations on distributed datasets later on.
-**Note:** *In this guide, we'll often use the concise Java 8 lambda syntax to specify Java functions, but
-in older versions of Java you can implement the interfaces in the
-[org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package.
-We describe [passing functions to Spark](#passing-functions-to-spark) in more detail below.*
-
</div>
<div data-lang="python" markdown="1">
@@ -658,7 +653,7 @@ There are two ways to create such functions:
* Implement the Function interfaces in your own class, either as an anonymous inner class or a named one,
and pass an instance of it to Spark.
-* In Java 8, use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
+* Use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
to concisely define an implementation.
While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 0836c602fe..04ac278762 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -320,13 +320,8 @@ public class SimpleApp {
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();
- long numAs = logData.filter(new Function<String, Boolean>() {
- public Boolean call(String s) { return s.contains("a"); }
- }).count();
-
- long numBs = logData.filter(new Function<String, Boolean>() {
- public Boolean call(String s) { return s.contains("b"); }
- }).count();
+ long numAs = logData.filter(s -> s.contains("a")).count();
+ long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 117996db9d..d4ddcb16bd 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -113,15 +113,13 @@ public class JavaCustomReceiver extends Receiver<String> {
port = port_;
}
+ @Override
public void onStart() {
// Start the thread that receives data over a connection
- new Thread() {
- @Override public void run() {
- receive();
- }
- }.start();
+ new Thread(this::receive).start();
}
+ @Override
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
@@ -189,7 +187,7 @@ The full source code is in the example [CustomReceiver.scala]({{site.SPARK_GITHU
{% highlight java %}
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
-JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
+JavaDStream<String> words = lines.flatMap(s -> ...);
...
{% endhighlight %}
diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md
index 6ef54ac210..e383701316 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -68,20 +68,14 @@ kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
-final JavaInputDStream<ConsumerRecord<String, String>> stream =
+JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
-stream.mapToPair(
- new PairFunction<ConsumerRecord<String, String>, String, String>() {
- @Override
- public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
- return new Tuple2<>(record.key(), record.value());
- }
- })
+stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
{% endhighlight %}
</div>
</div>
@@ -162,19 +156,13 @@ stream.foreachRDD { rdd =>
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
- @Override
- public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
- OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
- System.out.println(
- o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
- }
- });
- }
+stream.foreachRDD(rdd -> {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ rdd.foreachPartition(consumerRecords -> {
+ OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
+ });
});
{% endhighlight %}
</div>
@@ -205,14 +193,11 @@ As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if calle
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+stream.foreachRDD(rdd -> {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- // some time later, after outputs have completed
- ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
- }
+ // some time later, after outputs have completed
+ ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
{% endhighlight %}
</div>
@@ -268,21 +253,18 @@ JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirec
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
-
- Object results = yourCalculation(rdd);
+stream.foreachRDD(rdd -> {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+
+ Object results = yourCalculation(rdd);
- // begin your transaction
+ // begin your transaction
- // update results
- // update offsets where the end of existing offsets matches the beginning of this batch of offsets
- // assert that offsets were updated correctly
+ // update results
+ // update offsets where the end of existing offsets matches the beginning of this batch of offsets
+ // assert that offsets were updated correctly
- // end your transaction
- }
+ // end your transaction
});
{% endhighlight %}
</div>
diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md
index 58b17aa4ce..24a3e4cdbb 100644
--- a/docs/streaming-kafka-0-8-integration.md
+++ b/docs/streaming-kafka-0-8-integration.md
@@ -155,33 +155,22 @@ Next, we discuss how to use this approach in your streaming application.
</div>
<div data-lang="java" markdown="1">
// Hold a reference to the current offset ranges, so it can be used downstream
- final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
-
- directKafkaStream.transformToPair(
- new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
- @Override
- public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
- OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- offsetRanges.set(offsets);
- return rdd;
- }
- }
- ).map(
+ AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
+
+ directKafkaStream.transformToPair(rdd -> {
+ OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ offsetRanges.set(offsets);
+ return rdd;
+ }).map(
...
- ).foreachRDD(
- new Function<JavaPairRDD<String, String>, Void>() {
- @Override
- public Void call(JavaPairRDD<String, String> rdd) throws IOException {
- for (OffsetRange o : offsetRanges.get()) {
- System.out.println(
- o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
- );
- }
- ...
- return null;
- }
- }
- );
+ ).foreachRDD(rdd -> {
+ for (OffsetRange o : offsetRanges.get()) {
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
+ );
+ }
+ ...
+ });
</div>
<div data-lang="python" markdown="1">
offsetRanges = []
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index a878971608..abd4ac9653 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -163,12 +163,7 @@ space into words.
{% highlight java %}
// Split each line into words
-JavaDStream<String> words = lines.flatMap(
- new FlatMapFunction<String, String>() {
- @Override public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- });
+JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
{% endhighlight %}
`flatMap` is a DStream operation that creates a new DStream by
@@ -183,18 +178,8 @@ Next, we want to count these words.
{% highlight java %}
// Count each word in each batch
-JavaPairDStream<String, Integer> pairs = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- });
-JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
+JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
+JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
@@ -836,11 +821,9 @@ the `(word, 1)` pairs) and the `runningCount` having the previous count.
{% highlight java %}
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
- new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- Integer newSum = ... // add the new values with the previous running count to get the new count
- return Optional.of(newSum);
- }
+ (values, state) -> {
+ Integer newSum = ... // add the new values with the previous running count to get the new count
+ return Optional.of(newSum);
};
{% endhighlight %}
@@ -915,15 +898,12 @@ val cleanedDStream = wordCounts.transform { rdd =>
{% highlight java %}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
-final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
+JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
-JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
- new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
- rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
- ...
- }
- });
+JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
+ rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
+ ...
+});
{% endhighlight %}
</div>
@@ -986,15 +966,8 @@ val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Se
<div data-lang="java" markdown="1">
{% highlight java %}
-// Reduce function adding two integers, defined separately for clarity
-Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
- @Override public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
-};
-
// Reduce last 30 seconds of data, every 10 seconds
-JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));
+JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
{% endhighlight %}
</div>
@@ -1141,14 +1114,7 @@ val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
{% highlight java %}
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
-JavaPairDStream<String, String> joinedStream = windowedStream.transform(
- new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
- @Override
- public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
- return rdd.join(dataset);
- }
- }
-);
+JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
@@ -1248,17 +1214,11 @@ dstream.foreachRDD { rdd =>
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- final Connection connection = createNewConnection(); // executed at the driver
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String record) {
- connection.send(record); // executed at the worker
- }
- });
- }
+dstream.foreachRDD(rdd -> {
+ Connection connection = createNewConnection(); // executed at the driver
+ rdd.foreach(record -> {
+ connection.send(record); // executed at the worker
+ });
});
{% endhighlight %}
</div>
@@ -1297,18 +1257,12 @@ dstream.foreachRDD { rdd =>
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String record) {
- Connection connection = createNewConnection();
- connection.send(record);
- connection.close();
- }
- });
- }
+dstream.foreachRDD(rdd -> {
+ rdd.foreach(record -> {
+ Connection connection = createNewConnection();
+ connection.send(record);
+ connection.close();
+ });
});
{% endhighlight %}
</div>
@@ -1344,20 +1298,14 @@ dstream.foreachRDD { rdd =>
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
- @Override
- public void call(Iterator<String> partitionOfRecords) {
- Connection connection = createNewConnection();
- while (partitionOfRecords.hasNext()) {
- connection.send(partitionOfRecords.next());
- }
- connection.close();
- }
- });
- }
+dstream.foreachRDD(rdd -> {
+ rdd.foreachPartition(partitionOfRecords -> {
+ Connection connection = createNewConnection();
+ while (partitionOfRecords.hasNext()) {
+ connection.send(partitionOfRecords.next());
+ }
+ connection.close();
+ });
});
{% endhighlight %}
</div>
@@ -1396,21 +1344,15 @@ dstream.foreachRDD { rdd =>
<div data-lang="java" markdown="1">
{% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
- @Override
- public void call(Iterator<String> partitionOfRecords) {
- // ConnectionPool is a static, lazily initialized pool of connections
- Connection connection = ConnectionPool.getConnection();
- while (partitionOfRecords.hasNext()) {
- connection.send(partitionOfRecords.next());
- }
- ConnectionPool.returnConnection(connection); // return to the pool for future reuse
- }
- });
- }
+dstream.foreachRDD(rdd -> {
+ rdd.foreachPartition(partitionOfRecords -> {
+ // ConnectionPool is a static, lazily initialized pool of connections
+ Connection connection = ConnectionPool.getConnection();
+ while (partitionOfRecords.hasNext()) {
+ connection.send(partitionOfRecords.next());
+ }
+ ConnectionPool.returnConnection(connection); // return to the pool for future reuse
+ });
});
{% endhighlight %}
</div>
@@ -1495,35 +1437,26 @@ public class JavaRow implements java.io.Serializable {
JavaDStream<String> words = ...
-words.foreachRDD(
- new Function2<JavaRDD<String>, Time, Void>() {
- @Override
- public Void call(JavaRDD<String> rdd, Time time) {
-
- // Get the singleton instance of SparkSession
- SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
+words.foreachRDD((rdd, time) -> {
+ // Get the singleton instance of SparkSession
+ SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
- // Convert RDD[String] to RDD[case class] to DataFrame
- JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
- public JavaRow call(String word) {
- JavaRow record = new JavaRow();
- record.setWord(word);
- return record;
- }
- });
- DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
+ JavaRow record = new JavaRow();
+ record.setWord(word);
+ return record;
+ });
+ DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
- // Creates a temporary view using the DataFrame
- wordsDataFrame.createOrReplaceTempView("words");
+ // Creates a temporary view using the DataFrame
+ wordsDataFrame.createOrReplaceTempView("words");
- // Do word count on table using SQL and print it
- DataFrame wordCountsDataFrame =
- spark.sql("select word, count(*) as total from words group by word");
- wordCountsDataFrame.show();
- return null;
- }
- }
-);
+ // Do word count on table using SQL and print it
+ DataFrame wordCountsDataFrame =
+ spark.sql("select word, count(*) as total from words group by word");
+ wordCountsDataFrame.show();
+});
{% endhighlight %}
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java).
@@ -1883,27 +1816,21 @@ class JavaDroppedWordsCounter {
}
}
-wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
- @Override
- public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
- // Get or register the blacklist Broadcast
- final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
- // Get or register the droppedWordsCounter Accumulator
- final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
- // Use blacklist to drop words and use droppedWordsCounter to count them
- String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
- @Override
- public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
- if (blacklist.value().contains(wordCount._1())) {
- droppedWordsCounter.add(wordCount._2());
- return false;
- } else {
- return true;
- }
- }
- }).collect().toString();
- String output = "Counts at time " + time + " " + counts;
- }
+wordCounts.foreachRDD((rdd, time) -> {
+ // Get or register the blacklist Broadcast
+ Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+ // Get or register the droppedWordsCounter Accumulator
+ LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ String counts = rdd.filter(wordCount -> {
+ if (blacklist.value().contains(wordCount._1())) {
+ droppedWordsCounter.add(wordCount._2());
+ return false;
+ } else {
+ return true;
+ }
+ }).collect().toString();
+ String output = "Counts at time " + time + " " + counts;
}
{% endhighlight %}
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index b816072cb8..ad3b2fb26d 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -103,13 +103,7 @@ Dataset<Row> lines = spark
// Split the lines into words
Dataset<String> words = lines
.as(Encoders.STRING())
- .flatMap(
- new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- }, Encoders.STRING());
+ .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
@@ -517,7 +511,7 @@ val csvDF = spark
SparkSession spark = ...
// Read text from socket
-Dataset[Row] socketDF = spark
+Dataset<Row> socketDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
@@ -530,7 +524,7 @@ socketDF.printSchema();
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
-Dataset[Row] csvDF = spark
+Dataset<Row> csvDF = spark
.readStream()
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
@@ -625,33 +619,15 @@ Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); //
// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
-ds.filter(new FilterFunction<DeviceData>() { // using typed APIs
- @Override
- public boolean call(DeviceData value) throws Exception {
- return value.getSignal() > 10;
- }
-}).map(new MapFunction<DeviceData, String>() {
- @Override
- public String call(DeviceData value) throws Exception {
- return value.getDevice();
- }
-}, Encoders.STRING());
+ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
+ .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());
// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API
// Running average signal for each device type
-ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
- @Override
- public String call(DeviceData value) throws Exception {
- return value.getDeviceType();
- }
-}, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() {
- @Override
- public Double call(DeviceData value) throws Exception {
- return value.getSignal();
- }
-}));
+ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
+ .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
{% endhighlight %}