aboutsummaryrefslogtreecommitdiff
path: root/docs/structured-streaming-programming-guide.md
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
committerSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
commit0e2405490f2056728d1353abbac6f3ea177ae533 (patch)
tree1a9ec960faec7abcb8d8fbac43b6a6dc633d2297 /docs/structured-streaming-programming-guide.md
parent3871d94a695d47169720e877f77ff1e4bede43ee (diff)
downloadspark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.gz
spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.bz2
spark-0e2405490f2056728d1353abbac6f3ea177ae533.zip
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16871 from srowen/SPARK-19493.
Diffstat (limited to 'docs/structured-streaming-programming-guide.md')
-rw-r--r--docs/structured-streaming-programming-guide.md38
1 files changed, 7 insertions, 31 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index b816072cb8..ad3b2fb26d 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -103,13 +103,7 @@ Dataset<Row> lines = spark
// Split the lines into words
Dataset<String> words = lines
.as(Encoders.STRING())
- .flatMap(
- new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- }, Encoders.STRING());
+ .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
@@ -517,7 +511,7 @@ val csvDF = spark
SparkSession spark = ...
// Read text from socket
-Dataset[Row] socketDF = spark
+Dataset<Row> socketDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
@@ -530,7 +524,7 @@ socketDF.printSchema();
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
-Dataset[Row] csvDF = spark
+Dataset<Row> csvDF = spark
.readStream()
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
@@ -625,33 +619,15 @@ Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); //
// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
-ds.filter(new FilterFunction<DeviceData>() { // using typed APIs
- @Override
- public boolean call(DeviceData value) throws Exception {
- return value.getSignal() > 10;
- }
-}).map(new MapFunction<DeviceData, String>() {
- @Override
- public String call(DeviceData value) throws Exception {
- return value.getDevice();
- }
-}, Encoders.STRING());
+ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
+ .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());
// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API
// Running average signal for each device type
-ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
- @Override
- public String call(DeviceData value) throws Exception {
- return value.getDeviceType();
- }
-}, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() {
- @Override
- public Double call(DeviceData value) throws Exception {
- return value.getSignal();
- }
-}));
+ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
+ .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
{% endhighlight %}