From 1426a080528bdb470b5e81300d892af45dd188bf Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 18 Jul 2016 23:07:59 -0700 Subject: [SPARK-16303][DOCS][EXAMPLES] Minor Scala/Java example update ## What changes were proposed in this pull request? This PR moves one and the last hard-coded Scala example snippet from the SQL programming guide into `SparkSqlExample.scala`. It also renames all Scala/Java example files so that all "Sql" in the file names are updated to "SQL". ## How was this patch tested? Manually verified the generated HTML page. Author: Cheng Lian Closes #14245 from liancheng/minor-scala-example-update. --- docs/sql-programming-guide.md | 57 ++-- .../examples/sql/JavaSQLDataSourceExample.java | 217 +++++++++++++ .../spark/examples/sql/JavaSparkSQLExample.java | 336 +++++++++++++++++++++ .../spark/examples/sql/JavaSparkSqlExample.java | 336 --------------------- .../examples/sql/JavaSqlDataSourceExample.java | 217 ------------- .../spark/examples/sql/SQLDataSourceExample.scala | 148 +++++++++ .../spark/examples/sql/SparkSQLExample.scala | 254 ++++++++++++++++ .../spark/examples/sql/SparkSqlExample.scala | 254 ---------------- .../spark/examples/sql/SqlDataSourceExample.scala | 148 --------- 9 files changed, 983 insertions(+), 984 deletions(-) create mode 100644 examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java create mode 100644 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java delete mode 100644 examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java delete mode 100644 examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala delete mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala delete mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4413fdd2f6..71f3ee40a3 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -65,14 +65,14 @@ Throughout this document, we will often refer to Scala/Java Datasets of `Row`s a The entry point into all functionality in Spark is the [`SparkSession`](api/scala/index.html#org.apache.spark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.builder()`: -{% include_example init_session scala/org/apache/spark/examples/sql/SparkSqlExample.scala %} +{% include_example init_session scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
The entry point into all functionality in Spark is the [`SparkSession`](api/java/index.html#org.apache.spark.sql.SparkSession) class. To create a basic `SparkSession`, just use `SparkSession.builder()`: -{% include_example init_session java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %} +{% include_example init_session java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
@@ -105,7 +105,7 @@ from a Hive table, or from [Spark data sources](#data-sources). As an example, the following creates a DataFrame based on the content of a JSON file: -{% include_example create_df scala/org/apache/spark/examples/sql/SparkSqlExample.scala %} +{% include_example create_df scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
@@ -114,7 +114,7 @@ from a Hive table, or from [Spark data sources](#data-sources). As an example, the following creates a DataFrame based on the content of a JSON file: -{% include_example create_df java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %} +{% include_example create_df java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
@@ -155,7 +155,7 @@ Here we include some basic examples of structured data processing using Datasets
-{% include_example untyped_ops scala/org/apache/spark/examples/sql/SparkSqlExample.scala %} +{% include_example untyped_ops scala/org/apache/spark/examples/sql/SparkSQLExample.scala %} For a complete list of the types of operations that can be performed on a Dataset refer to the [API Documentation](api/scala/index.html#org.apache.spark.sql.Dataset). @@ -164,7 +164,7 @@ In addition to simple column references and expressions, Datasets also have a ri
-{% include_example untyped_ops java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %} +{% include_example untyped_ops java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %} For a complete list of the types of operations that can be performed on a Dataset refer to the [API Documentation](api/java/org/apache/spark/sql/Dataset.html). @@ -249,13 +249,13 @@ In addition to simple column references and expressions, DataFrames also have a
The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. -{% include_example run_sql scala/org/apache/spark/examples/sql/SparkSqlExample.scala %} +{% include_example run_sql scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `Dataset`. -{% include_example run_sql java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %} +{% include_example run_sql java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
@@ -287,11 +287,11 @@ the bytes back into an object.
-{% include_example create_ds scala/org/apache/spark/examples/sql/SparkSqlExample.scala %} +{% include_example create_ds scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
-{% include_example create_ds java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %} +{% include_example create_ds java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
@@ -318,7 +318,7 @@ reflection and become the names of the columns. Case classes can also be nested types such as `Seq`s or `Array`s. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements. -{% include_example schema_inferring scala/org/apache/spark/examples/sql/SparkSqlExample.scala %} +{% include_example schema_inferring scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
@@ -330,7 +330,7 @@ does not support JavaBeans that contain `Map` field(s). Nested JavaBeans and `Li fields are supported though. You can create a JavaBean by creating a class that implements Serializable and has getters and setters for all of its fields. -{% include_example schema_inferring java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %} +{% include_example schema_inferring java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
@@ -385,7 +385,7 @@ by `SparkSession`. For example: -{% include_example programmatic_schema scala/org/apache/spark/examples/sql/SparkSqlExample.scala %} +{% include_example programmatic_schema scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
@@ -403,7 +403,7 @@ by `SparkSession`. For example: -{% include_example programmatic_schema java/org/apache/spark/examples/sql/JavaSparkSqlExample.java %} +{% include_example programmatic_schema java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
@@ -472,11 +472,11 @@ In the simplest form, the default data source (`parquet` unless otherwise config
-{% include_example generic_load_save_functions scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %} +{% include_example generic_load_save_functions scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
-{% include_example generic_load_save_functions java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %} +{% include_example generic_load_save_functions java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
@@ -507,11 +507,11 @@ using this syntax.
-{% include_example manual_load_options scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %} +{% include_example manual_load_options scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
-{% include_example manual_load_options java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %} +{% include_example manual_load_options java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
@@ -538,11 +538,11 @@ file directly with SQL.
-{% include_example direct_sql scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %} +{% include_example direct_sql scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
-{% include_example direct_sql java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %} +{% include_example direct_sql java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
@@ -633,11 +633,11 @@ Using the data from the above example:
-{% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %} +{% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
-{% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %} +{% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
@@ -766,11 +766,11 @@ turned it off by default starting from 1.5.0. You may enable it by
-{% include_example schema_merging scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %} +{% include_example schema_merging scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
-{% include_example schema_merging java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %} +{% include_example schema_merging java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
@@ -973,7 +973,7 @@ Note that the file that is offered as _a json file_ is not a typical JSON file. line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail. -{% include_example json_dataset scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala %} +{% include_example json_dataset scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
@@ -985,7 +985,7 @@ Note that the file that is offered as _a json file_ is not a typical JSON file. line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail. -{% include_example json_dataset java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java %} +{% include_example json_dataset java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
@@ -1879,9 +1879,8 @@ Spark SQL and DataFrames support the following data types: All data types of Spark SQL are located in the package `org.apache.spark.sql.types`. You can access them by doing -{% highlight scala %} -import org.apache.spark.sql.types._ -{% endhighlight %} + +{% include_example data_types scala/org/apache/spark/examples/sql/SparkSQLExample.scala %} diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java new file mode 100644 index 0000000000..2b94b9f114 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql; + +// $example on:schema_merging$ +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +// $example off:schema_merging$ + +// $example on:basic_parquet_example$ +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +// import org.apache.spark.sql.Encoders; +// $example on:schema_merging$ +// $example on:json_dataset$ +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +// $example off:json_dataset$ +// $example off:schema_merging$ +// $example off:basic_parquet_example$ +import org.apache.spark.sql.SparkSession; + +public class JavaSQLDataSourceExample { + + // $example on:schema_merging$ + public static class Square implements Serializable { + private int value; + private int square; + + // Getters and setters... + // $example off:schema_merging$ + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } + + public int getSquare() { + return square; + } + + public void setSquare(int square) { + this.square = square; + } + // $example on:schema_merging$ + } + // $example off:schema_merging$ + + // $example on:schema_merging$ + public static class Cube implements Serializable { + private int value; + private int cube; + + // Getters and setters... + // $example off:schema_merging$ + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } + + public int getCube() { + return cube; + } + + public void setCube(int cube) { + this.cube = cube; + } + // $example on:schema_merging$ + } + // $example off:schema_merging$ + + public static void main(String[] args) { + SparkSession spark = SparkSession + .builder() + .appName("Java Spark SQL Data Sources Example") + .config("spark.some.config.option", "some-value") + .getOrCreate(); + + runBasicDataSourceExample(spark); + runBasicParquetExample(spark); + runParquetSchemaMergingExample(spark); + runJsonDatasetExample(spark); + + spark.stop(); + } + + private static void runBasicDataSourceExample(SparkSession spark) { + // $example on:generic_load_save_functions$ + Dataset usersDF = spark.read().load("examples/src/main/resources/users.parquet"); + usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); + // $example off:generic_load_save_functions$ + // $example on:manual_load_options$ + Dataset peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json"); + peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); + // $example off:manual_load_options$ + // $example on:direct_sql$ + Dataset sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); + // $example off:direct_sql$ + } + + private static void runBasicParquetExample(SparkSession spark) { + // $example on:basic_parquet_example$ + Dataset peopleDF = spark.read().json("examples/src/main/resources/people.json"); + + // DataFrames can be saved as Parquet files, maintaining the schema information + peopleDF.write().parquet("people.parquet"); + + // Read in the Parquet file created above. + // Parquet files are self-describing so the schema is preserved + // The result of loading a parquet file is also a DataFrame + Dataset parquetFileDF = spark.read().parquet("people.parquet"); + + // Parquet files can also be used to create a temporary view and then used in SQL statements + parquetFileDF.createOrReplaceTempView("parquetFile"); + Dataset namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); + Dataset namesDS = namesDF.map(new MapFunction() { + public String call(Row row) { + return "Name: " + row.getString(0); + } + }, Encoders.STRING()); + namesDS.show(); + // +------------+ + // | value| + // +------------+ + // |Name: Justin| + // +------------+ + // $example off:basic_parquet_example$ + } + + private static void runParquetSchemaMergingExample(SparkSession spark) { + // $example on:schema_merging$ + List squares = new ArrayList<>(); + for (int value = 1; value <= 5; value++) { + Square square = new Square(); + square.setValue(value); + square.setSquare(value * value); + squares.add(square); + } + + // Create a simple DataFrame, store into a partition directory + Dataset squaresDF = spark.createDataFrame(squares, Square.class); + squaresDF.write().parquet("data/test_table/key=1"); + + List cubes = new ArrayList<>(); + for (int value = 6; value <= 10; value++) { + Cube cube = new Cube(); + cube.setValue(value); + cube.setCube(value * value * value); + cubes.add(cube); + } + + // Create another DataFrame in a new partition directory, + // adding a new column and dropping an existing column + Dataset cubesDF = spark.createDataFrame(cubes, Cube.class); + cubesDF.write().parquet("data/test_table/key=2"); + + // Read the partitioned table + Dataset mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table"); + mergedDF.printSchema(); + + // The final schema consists of all 3 columns in the Parquet files together + // with the partitioning column appeared in the partition directory paths + // root + // |-- value: int (nullable = true) + // |-- square: int (nullable = true) + // |-- cube: int (nullable = true) + // |-- key : int (nullable = true) + // $example off:schema_merging$ + } + + private static void runJsonDatasetExample(SparkSession spark) { + // $example on:json_dataset$ + // A JSON dataset is pointed to by path. + // The path can be either a single text file or a directory storing text files + Dataset people = spark.read().json("examples/src/main/resources/people.json"); + + // The inferred schema can be visualized using the printSchema() method + people.printSchema(); + // root + // |-- age: long (nullable = true) + // |-- name: string (nullable = true) + + // Creates a temporary view using the DataFrame + people.createOrReplaceTempView("people"); + + // SQL statements can be run by using the sql methods provided by spark + Dataset namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); + namesDF.show(); + // +------+ + // | name| + // +------+ + // |Justin| + // +------+ + // $example off:json_dataset$ + } + +} diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java new file mode 100644 index 0000000000..afc18078d4 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql; + +// $example on:programmatic_schema$ +import java.util.ArrayList; +import java.util.List; +// $example off:programmatic_schema$ +// $example on:create_ds$ +import java.util.Arrays; +import java.util.Collections; +import java.io.Serializable; +// $example off:create_ds$ + +// $example on:schema_inferring$ +// $example on:programmatic_schema$ +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +// $example off:programmatic_schema$ +// $example on:create_ds$ +import org.apache.spark.api.java.function.MapFunction; +// $example on:create_df$ +// $example on:run_sql$ +// $example on:programmatic_schema$ +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +// $example off:programmatic_schema$ +// $example off:create_df$ +// $example off:run_sql$ +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +// $example off:create_ds$ +// $example off:schema_inferring$ +import org.apache.spark.sql.RowFactory; +// $example on:init_session$ +import org.apache.spark.sql.SparkSession; +// $example off:init_session$ +// $example on:programmatic_schema$ +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +// $example off:programmatic_schema$ + +// $example on:untyped_ops$ +// col("...") is preferable to df.col("...") +import static org.apache.spark.sql.functions.col; +// $example off:untyped_ops$ + +public class JavaSparkSQLExample { + // $example on:create_ds$ + public static class Person implements Serializable { + private String name; + private int age; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + } + // $example off:create_ds$ + + public static void main(String[] args) { + // $example on:init_session$ + SparkSession spark = SparkSession + .builder() + .appName("Java Spark SQL Example") + .config("spark.some.config.option", "some-value") + .getOrCreate(); + // $example off:init_session$ + + runBasicDataFrameExample(spark); + runDatasetCreationExample(spark); + runInferSchemaExample(spark); + runProgrammaticSchemaExample(spark); + + spark.stop(); + } + + private static void runBasicDataFrameExample(SparkSession spark) { + // $example on:create_df$ + Dataset df = spark.read().json("examples/src/main/resources/people.json"); + + // Displays the content of the DataFrame to stdout + df.show(); + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + // $example off:create_df$ + + // $example on:untyped_ops$ + // Print the schema in a tree format + df.printSchema(); + // root + // |-- age: long (nullable = true) + // |-- name: string (nullable = true) + + // Select only the "name" column + df.select("name").show(); + // +-------+ + // | name| + // +-------+ + // |Michael| + // | Andy| + // | Justin| + // +-------+ + + // Select everybody, but increment the age by 1 + df.select(col("name"), col("age").plus(1)).show(); + // +-------+---------+ + // | name|(age + 1)| + // +-------+---------+ + // |Michael| null| + // | Andy| 31| + // | Justin| 20| + // +-------+---------+ + + // Select people older than 21 + df.filter(col("age").gt(21)).show(); + // +---+----+ + // |age|name| + // +---+----+ + // | 30|Andy| + // +---+----+ + + // Count people by age + df.groupBy("age").count().show(); + // +----+-----+ + // | age|count| + // +----+-----+ + // | 19| 1| + // |null| 1| + // | 30| 1| + // +----+-----+ + // $example off:untyped_ops$ + + // $example on:run_sql$ + // Register the DataFrame as a SQL temporary view + df.createOrReplaceTempView("people"); + + Dataset sqlDF = spark.sql("SELECT * FROM people"); + sqlDF.show(); + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + // $example off:run_sql$ + } + + private static void runDatasetCreationExample(SparkSession spark) { + // $example on:create_ds$ + // Create an instance of a Bean class + Person person = new Person(); + person.setName("Andy"); + person.setAge(32); + + // Encoders are created for Java beans + Encoder personEncoder = Encoders.bean(Person.class); + Dataset javaBeanDS = spark.createDataset( + Collections.singletonList(person), + personEncoder + ); + javaBeanDS.show(); + // +---+----+ + // |age|name| + // +---+----+ + // | 32|Andy| + // +---+----+ + + // Encoders for most common types are provided in class Encoders + Encoder integerEncoder = Encoders.INT(); + Dataset primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); + Dataset transformedDS = primitiveDS.map(new MapFunction() { + @Override + public Integer call(Integer value) throws Exception { + return value + 1; + } + }, integerEncoder); + transformedDS.collect(); // Returns [2, 3, 4] + + // DataFrames can be converted to a Dataset by providing a class. Mapping based on name + String path = "examples/src/main/resources/people.json"; + Dataset peopleDS = spark.read().json(path).as(personEncoder); + peopleDS.show(); + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + // $example off:create_ds$ + } + + private static void runInferSchemaExample(SparkSession spark) { + // $example on:schema_inferring$ + // Create an RDD of Person objects from a text file + JavaRDD peopleRDD = spark.read() + .textFile("examples/src/main/resources/people.txt") + .javaRDD() + .map(new Function() { + @Override + public Person call(String line) throws Exception { + String[] parts = line.split(","); + Person person = new Person(); + person.setName(parts[0]); + person.setAge(Integer.parseInt(parts[1].trim())); + return person; + } + }); + + // Apply a schema to an RDD of JavaBeans to get a DataFrame + Dataset peopleDF = spark.createDataFrame(peopleRDD, Person.class); + // Register the DataFrame as a temporary view + peopleDF.createOrReplaceTempView("people"); + + // SQL statements can be run by using the sql methods provided by spark + Dataset teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); + + // The columns of a row in the result can be accessed by field index + Encoder stringEncoder = Encoders.STRING(); + Dataset teenagerNamesByIndexDF = teenagersDF.map(new MapFunction() { + @Override + public String call(Row row) throws Exception { + return "Name: " + row.getString(0); + } + }, stringEncoder); + teenagerNamesByIndexDF.show(); + // +------------+ + // | value| + // +------------+ + // |Name: Justin| + // +------------+ + + // or by field name + Dataset teenagerNamesByFieldDF = teenagersDF.map(new MapFunction() { + @Override + public String call(Row row) throws Exception { + return "Name: " + row.getAs("name"); + } + }, stringEncoder); + teenagerNamesByFieldDF.show(); + // +------------+ + // | value| + // +------------+ + // |Name: Justin| + // +------------+ + // $example off:schema_inferring$ + } + + private static void runProgrammaticSchemaExample(SparkSession spark) { + // $example on:programmatic_schema$ + // Create an RDD + JavaRDD peopleRDD = spark.sparkContext() + .textFile("examples/src/main/resources/people.txt", 1) + .toJavaRDD(); + + // The schema is encoded in a string + String schemaString = "name age"; + + // Generate the schema based on the string of schema + List fields = new ArrayList<>(); + for (String fieldName : schemaString.split(" ")) { + StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); + fields.add(field); + } + StructType schema = DataTypes.createStructType(fields); + + // Convert records of the RDD (people) to Rows + JavaRDD rowRDD = peopleRDD.map(new Function() { + @Override + public Row call(String record) throws Exception { + String[] attributes = record.split(","); + return RowFactory.create(attributes[0], attributes[1].trim()); + } + }); + + // Apply the schema to the RDD + Dataset peopleDataFrame = spark.createDataFrame(rowRDD, schema); + + // Creates a temporary view using the DataFrame + peopleDataFrame.createOrReplaceTempView("people"); + + // SQL can be run over a temporary view created using DataFrames + Dataset results = spark.sql("SELECT name FROM people"); + + // The results of SQL queries are DataFrames and support all the normal RDD operations + // The columns of a row in the result can be accessed by field index or by field name + Dataset namesDS = results.map(new MapFunction() { + @Override + public String call(Row row) throws Exception { + return "Name: " + row.getString(0); + } + }, Encoders.STRING()); + namesDS.show(); + // +-------------+ + // | value| + // +-------------+ + // |Name: Michael| + // | Name: Andy| + // | Name: Justin| + // +-------------+ + // $example off:programmatic_schema$ + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java deleted file mode 100644 index 586d6e3a3e..0000000000 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.examples.sql; - -// $example on:programmatic_schema$ -import java.util.ArrayList; -import java.util.List; -// $example off:programmatic_schema$ -// $example on:create_ds$ -import java.util.Arrays; -import java.util.Collections; -import java.io.Serializable; -// $example off:create_ds$ - -// $example on:schema_inferring$ -// $example on:programmatic_schema$ -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -// $example off:programmatic_schema$ -// $example on:create_ds$ -import org.apache.spark.api.java.function.MapFunction; -// $example on:create_df$ -// $example on:run_sql$ -// $example on:programmatic_schema$ -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -// $example off:programmatic_schema$ -// $example off:create_df$ -// $example off:run_sql$ -import org.apache.spark.sql.Encoder; -import org.apache.spark.sql.Encoders; -// $example off:create_ds$ -// $example off:schema_inferring$ -import org.apache.spark.sql.RowFactory; -// $example on:init_session$ -import org.apache.spark.sql.SparkSession; -// $example off:init_session$ -// $example on:programmatic_schema$ -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -// $example off:programmatic_schema$ - -// $example on:untyped_ops$ -// col("...") is preferable to df.col("...") -import static org.apache.spark.sql.functions.col; -// $example off:untyped_ops$ - -public class JavaSparkSqlExample { - // $example on:create_ds$ - public static class Person implements Serializable { - private String name; - private int age; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public int getAge() { - return age; - } - - public void setAge(int age) { - this.age = age; - } - } - // $example off:create_ds$ - - public static void main(String[] args) { - // $example on:init_session$ - SparkSession spark = SparkSession - .builder() - .appName("Java Spark SQL Example") - .config("spark.some.config.option", "some-value") - .getOrCreate(); - // $example off:init_session$ - - runBasicDataFrameExample(spark); - runDatasetCreationExample(spark); - runInferSchemaExample(spark); - runProgrammaticSchemaExample(spark); - - spark.stop(); - } - - private static void runBasicDataFrameExample(SparkSession spark) { - // $example on:create_df$ - Dataset df = spark.read().json("examples/src/main/resources/people.json"); - - // Displays the content of the DataFrame to stdout - df.show(); - // +----+-------+ - // | age| name| - // +----+-------+ - // |null|Michael| - // | 30| Andy| - // | 19| Justin| - // +----+-------+ - // $example off:create_df$ - - // $example on:untyped_ops$ - // Print the schema in a tree format - df.printSchema(); - // root - // |-- age: long (nullable = true) - // |-- name: string (nullable = true) - - // Select only the "name" column - df.select("name").show(); - // +-------+ - // | name| - // +-------+ - // |Michael| - // | Andy| - // | Justin| - // +-------+ - - // Select everybody, but increment the age by 1 - df.select(col("name"), col("age").plus(1)).show(); - // +-------+---------+ - // | name|(age + 1)| - // +-------+---------+ - // |Michael| null| - // | Andy| 31| - // | Justin| 20| - // +-------+---------+ - - // Select people older than 21 - df.filter(col("age").gt(21)).show(); - // +---+----+ - // |age|name| - // +---+----+ - // | 30|Andy| - // +---+----+ - - // Count people by age - df.groupBy("age").count().show(); - // +----+-----+ - // | age|count| - // +----+-----+ - // | 19| 1| - // |null| 1| - // | 30| 1| - // +----+-----+ - // $example off:untyped_ops$ - - // $example on:run_sql$ - // Register the DataFrame as a SQL temporary view - df.createOrReplaceTempView("people"); - - Dataset sqlDF = spark.sql("SELECT * FROM people"); - sqlDF.show(); - // +----+-------+ - // | age| name| - // +----+-------+ - // |null|Michael| - // | 30| Andy| - // | 19| Justin| - // +----+-------+ - // $example off:run_sql$ - } - - private static void runDatasetCreationExample(SparkSession spark) { - // $example on:create_ds$ - // Create an instance of a Bean class - Person person = new Person(); - person.setName("Andy"); - person.setAge(32); - - // Encoders are created for Java beans - Encoder personEncoder = Encoders.bean(Person.class); - Dataset javaBeanDS = spark.createDataset( - Collections.singletonList(person), - personEncoder - ); - javaBeanDS.show(); - // +---+----+ - // |age|name| - // +---+----+ - // | 32|Andy| - // +---+----+ - - // Encoders for most common types are provided in class Encoders - Encoder integerEncoder = Encoders.INT(); - Dataset primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); - Dataset transformedDS = primitiveDS.map(new MapFunction() { - @Override - public Integer call(Integer value) throws Exception { - return value + 1; - } - }, integerEncoder); - transformedDS.collect(); // Returns [2, 3, 4] - - // DataFrames can be converted to a Dataset by providing a class. Mapping based on name - String path = "examples/src/main/resources/people.json"; - Dataset peopleDS = spark.read().json(path).as(personEncoder); - peopleDS.show(); - // +----+-------+ - // | age| name| - // +----+-------+ - // |null|Michael| - // | 30| Andy| - // | 19| Justin| - // +----+-------+ - // $example off:create_ds$ - } - - private static void runInferSchemaExample(SparkSession spark) { - // $example on:schema_inferring$ - // Create an RDD of Person objects from a text file - JavaRDD peopleRDD = spark.read() - .textFile("examples/src/main/resources/people.txt") - .javaRDD() - .map(new Function() { - @Override - public Person call(String line) throws Exception { - String[] parts = line.split(","); - Person person = new Person(); - person.setName(parts[0]); - person.setAge(Integer.parseInt(parts[1].trim())); - return person; - } - }); - - // Apply a schema to an RDD of JavaBeans to get a DataFrame - Dataset peopleDF = spark.createDataFrame(peopleRDD, Person.class); - // Register the DataFrame as a temporary view - peopleDF.createOrReplaceTempView("people"); - - // SQL statements can be run by using the sql methods provided by spark - Dataset teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); - - // The columns of a row in the result can be accessed by field index - Encoder stringEncoder = Encoders.STRING(); - Dataset teenagerNamesByIndexDF = teenagersDF.map(new MapFunction() { - @Override - public String call(Row row) throws Exception { - return "Name: " + row.getString(0); - } - }, stringEncoder); - teenagerNamesByIndexDF.show(); - // +------------+ - // | value| - // +------------+ - // |Name: Justin| - // +------------+ - - // or by field name - Dataset teenagerNamesByFieldDF = teenagersDF.map(new MapFunction() { - @Override - public String call(Row row) throws Exception { - return "Name: " + row.getAs("name"); - } - }, stringEncoder); - teenagerNamesByFieldDF.show(); - // +------------+ - // | value| - // +------------+ - // |Name: Justin| - // +------------+ - // $example off:schema_inferring$ - } - - private static void runProgrammaticSchemaExample(SparkSession spark) { - // $example on:programmatic_schema$ - // Create an RDD - JavaRDD peopleRDD = spark.sparkContext() - .textFile("examples/src/main/resources/people.txt", 1) - .toJavaRDD(); - - // The schema is encoded in a string - String schemaString = "name age"; - - // Generate the schema based on the string of schema - List fields = new ArrayList<>(); - for (String fieldName : schemaString.split(" ")) { - StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); - fields.add(field); - } - StructType schema = DataTypes.createStructType(fields); - - // Convert records of the RDD (people) to Rows - JavaRDD rowRDD = peopleRDD.map(new Function() { - @Override - public Row call(String record) throws Exception { - String[] attributes = record.split(","); - return RowFactory.create(attributes[0], attributes[1].trim()); - } - }); - - // Apply the schema to the RDD - Dataset peopleDataFrame = spark.createDataFrame(rowRDD, schema); - - // Creates a temporary view using the DataFrame - peopleDataFrame.createOrReplaceTempView("people"); - - // SQL can be run over a temporary view created using DataFrames - Dataset results = spark.sql("SELECT name FROM people"); - - // The results of SQL queries are DataFrames and support all the normal RDD operations - // The columns of a row in the result can be accessed by field index or by field name - Dataset namesDS = results.map(new MapFunction() { - @Override - public String call(Row row) throws Exception { - return "Name: " + row.getString(0); - } - }, Encoders.STRING()); - namesDS.show(); - // +-------------+ - // | value| - // +-------------+ - // |Name: Michael| - // | Name: Andy| - // | Name: Justin| - // +-------------+ - // $example off:programmatic_schema$ - } -} diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java deleted file mode 100644 index 4db5e1b0af..0000000000 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.examples.sql; - -// $example on:schema_merging$ -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -// $example off:schema_merging$ - -// $example on:basic_parquet_example$ -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; -// import org.apache.spark.sql.Encoders; -// $example on:schema_merging$ -// $example on:json_dataset$ -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -// $example off:json_dataset$ -// $example off:schema_merging$ -// $example off:basic_parquet_example$ -import org.apache.spark.sql.SparkSession; - -public class JavaSqlDataSourceExample { - - // $example on:schema_merging$ - public static class Square implements Serializable { - private int value; - private int square; - - // Getters and setters... - // $example off:schema_merging$ - public int getValue() { - return value; - } - - public void setValue(int value) { - this.value = value; - } - - public int getSquare() { - return square; - } - - public void setSquare(int square) { - this.square = square; - } - // $example on:schema_merging$ - } - // $example off:schema_merging$ - - // $example on:schema_merging$ - public static class Cube implements Serializable { - private int value; - private int cube; - - // Getters and setters... - // $example off:schema_merging$ - public int getValue() { - return value; - } - - public void setValue(int value) { - this.value = value; - } - - public int getCube() { - return cube; - } - - public void setCube(int cube) { - this.cube = cube; - } - // $example on:schema_merging$ - } - // $example off:schema_merging$ - - public static void main(String[] args) { - SparkSession spark = SparkSession - .builder() - .appName("Java Spark SQL Data Sources Example") - .config("spark.some.config.option", "some-value") - .getOrCreate(); - - runBasicDataSourceExample(spark); - runBasicParquetExample(spark); - runParquetSchemaMergingExample(spark); - runJsonDatasetExample(spark); - - spark.stop(); - } - - private static void runBasicDataSourceExample(SparkSession spark) { - // $example on:generic_load_save_functions$ - Dataset usersDF = spark.read().load("examples/src/main/resources/users.parquet"); - usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); - // $example off:generic_load_save_functions$ - // $example on:manual_load_options$ - Dataset peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json"); - peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); - // $example off:manual_load_options$ - // $example on:direct_sql$ - Dataset sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); - // $example off:direct_sql$ - } - - private static void runBasicParquetExample(SparkSession spark) { - // $example on:basic_parquet_example$ - Dataset peopleDF = spark.read().json("examples/src/main/resources/people.json"); - - // DataFrames can be saved as Parquet files, maintaining the schema information - peopleDF.write().parquet("people.parquet"); - - // Read in the Parquet file created above. - // Parquet files are self-describing so the schema is preserved - // The result of loading a parquet file is also a DataFrame - Dataset parquetFileDF = spark.read().parquet("people.parquet"); - - // Parquet files can also be used to create a temporary view and then used in SQL statements - parquetFileDF.createOrReplaceTempView("parquetFile"); - Dataset namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); - Dataset namesDS = namesDF.map(new MapFunction() { - public String call(Row row) { - return "Name: " + row.getString(0); - } - }, Encoders.STRING()); - namesDS.show(); - // +------------+ - // | value| - // +------------+ - // |Name: Justin| - // +------------+ - // $example off:basic_parquet_example$ - } - - private static void runParquetSchemaMergingExample(SparkSession spark) { - // $example on:schema_merging$ - List squares = new ArrayList<>(); - for (int value = 1; value <= 5; value++) { - Square square = new Square(); - square.setValue(value); - square.setSquare(value * value); - squares.add(square); - } - - // Create a simple DataFrame, store into a partition directory - Dataset squaresDF = spark.createDataFrame(squares, Square.class); - squaresDF.write().parquet("data/test_table/key=1"); - - List cubes = new ArrayList<>(); - for (int value = 6; value <= 10; value++) { - Cube cube = new Cube(); - cube.setValue(value); - cube.setCube(value * value * value); - cubes.add(cube); - } - - // Create another DataFrame in a new partition directory, - // adding a new column and dropping an existing column - Dataset cubesDF = spark.createDataFrame(cubes, Cube.class); - cubesDF.write().parquet("data/test_table/key=2"); - - // Read the partitioned table - Dataset mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table"); - mergedDF.printSchema(); - - // The final schema consists of all 3 columns in the Parquet files together - // with the partitioning column appeared in the partition directory paths - // root - // |-- value: int (nullable = true) - // |-- square: int (nullable = true) - // |-- cube: int (nullable = true) - // |-- key : int (nullable = true) - // $example off:schema_merging$ - } - - private static void runJsonDatasetExample(SparkSession spark) { - // $example on:json_dataset$ - // A JSON dataset is pointed to by path. - // The path can be either a single text file or a directory storing text files - Dataset people = spark.read().json("examples/src/main/resources/people.json"); - - // The inferred schema can be visualized using the printSchema() method - people.printSchema(); - // root - // |-- age: long (nullable = true) - // |-- name: string (nullable = true) - - // Creates a temporary view using the DataFrame - people.createOrReplaceTempView("people"); - - // SQL statements can be run by using the sql methods provided by spark - Dataset namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); - namesDF.show(); - // +------+ - // | name| - // +------+ - // |Justin| - // +------+ - // $example off:json_dataset$ - } - -} diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala new file mode 100644 index 0000000000..0caba12af0 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql + +import org.apache.spark.sql.SparkSession + +object SQLDataSourceExample { + + case class Person(name: String, age: Long) + + def main(args: Array[String]) { + val spark = SparkSession + .builder() + .appName("Spark SQL Data Soures Example") + .config("spark.some.config.option", "some-value") + .getOrCreate() + + runBasicDataSourceExample(spark) + runBasicParquetExample(spark) + runParquetSchemaMergingExample(spark) + runJsonDatasetExample(spark) + + spark.stop() + } + + private def runBasicDataSourceExample(spark: SparkSession): Unit = { + // $example on:generic_load_save_functions$ + val usersDF = spark.read.load("examples/src/main/resources/users.parquet") + usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") + // $example off:generic_load_save_functions$ + // $example on:manual_load_options$ + val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") + peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet") + // $example off:manual_load_options$ + // $example on:direct_sql$ + val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") + // $example off:direct_sql$ + } + + private def runBasicParquetExample(spark: SparkSession): Unit = { + // $example on:basic_parquet_example$ + // Encoders for most common types are automatically provided by importing spark.implicits._ + import spark.implicits._ + + val peopleDF = spark.read.json("examples/src/main/resources/people.json") + + // DataFrames can be saved as Parquet files, maintaining the schema information + peopleDF.write.parquet("people.parquet") + + // Read in the parquet file created above + // Parquet files are self-describing so the schema is preserved + // The result of loading a Parquet file is also a DataFrame + val parquetFileDF = spark.read.parquet("people.parquet") + + // Parquet files can also be used to create a temporary view and then used in SQL statements + parquetFileDF.createOrReplaceTempView("parquetFile") + val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") + namesDF.map(attributes => "Name: " + attributes(0)).show() + // +------------+ + // | value| + // +------------+ + // |Name: Justin| + // +------------+ + // $example off:basic_parquet_example$ + } + + private def runParquetSchemaMergingExample(spark: SparkSession): Unit = { + // $example on:schema_merging$ + // This is used to implicitly convert an RDD to a DataFrame. + import spark.implicits._ + + // Create a simple DataFrame, store into a partition directory + val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") + squaresDF.write.parquet("data/test_table/key=1") + + // Create another DataFrame in a new partition directory, + // adding a new column and dropping an existing column + val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") + cubesDF.write.parquet("data/test_table/key=2") + + // Read the partitioned table + val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") + mergedDF.printSchema() + + // The final schema consists of all 3 columns in the Parquet files together + // with the partitioning column appeared in the partition directory paths + // root + // |-- value: int (nullable = true) + // |-- square: int (nullable = true) + // |-- cube: int (nullable = true) + // |-- key : int (nullable = true) + // $example off:schema_merging$ + } + + private def runJsonDatasetExample(spark: SparkSession): Unit = { + // $example on:json_dataset$ + // A JSON dataset is pointed to by path. + // The path can be either a single text file or a directory storing text files + val path = "examples/src/main/resources/people.json" + val peopleDF = spark.read.json(path) + + // The inferred schema can be visualized using the printSchema() method + peopleDF.printSchema() + // root + // |-- age: long (nullable = true) + // |-- name: string (nullable = true) + + // Creates a temporary view using the DataFrame + peopleDF.createOrReplaceTempView("people") + + // SQL statements can be run by using the sql methods provided by spark + val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") + teenagerNamesDF.show() + // +------+ + // | name| + // +------+ + // |Justin| + // +------+ + + // Alternatively, a DataFrame can be created for a JSON dataset represented by + // an RDD[String] storing one JSON object per string + val otherPeopleRDD = spark.sparkContext.makeRDD( + """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) + val otherPeople = spark.read.json(otherPeopleRDD) + otherPeople.show() + // +---------------+----+ + // | address|name| + // +---------------+----+ + // |[Columbus,Ohio]| Yin| + // +---------------+----+ + // $example off:json_dataset$ + } + +} diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala new file mode 100644 index 0000000000..952c074d03 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql + +// $example on:schema_inferring$ +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.Encoder +// $example off:schema_inferring$ +import org.apache.spark.sql.Row +// $example on:init_session$ +import org.apache.spark.sql.SparkSession +// $example off:init_session$ +// $example on:programmatic_schema$ +// $example on:data_types$ +import org.apache.spark.sql.types._ +// $example off:data_types$ +// $example off:programmatic_schema$ + +object SparkSQLExample { + + // $example on:create_ds$ + // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, + // you can use custom classes that implement the Product interface + case class Person(name: String, age: Long) + // $example off:create_ds$ + + def main(args: Array[String]) { + // $example on:init_session$ + val spark = SparkSession + .builder() + .appName("Spark SQL Example") + .config("spark.some.config.option", "some-value") + .getOrCreate() + + // For implicit conversions like converting RDDs to DataFrames + import spark.implicits._ + // $example off:init_session$ + + runBasicDataFrameExample(spark) + runDatasetCreationExample(spark) + runInferSchemaExample(spark) + runProgrammaticSchemaExample(spark) + + spark.stop() + } + + private def runBasicDataFrameExample(spark: SparkSession): Unit = { + // $example on:create_df$ + val df = spark.read.json("examples/src/main/resources/people.json") + + // Displays the content of the DataFrame to stdout + df.show() + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + // $example off:create_df$ + + // $example on:untyped_ops$ + // This import is needed to use the $-notation + import spark.implicits._ + // Print the schema in a tree format + df.printSchema() + // root + // |-- age: long (nullable = true) + // |-- name: string (nullable = true) + + // Select only the "name" column + df.select("name").show() + // +-------+ + // | name| + // +-------+ + // |Michael| + // | Andy| + // | Justin| + // +-------+ + + // Select everybody, but increment the age by 1 + df.select($"name", $"age" + 1).show() + // +-------+---------+ + // | name|(age + 1)| + // +-------+---------+ + // |Michael| null| + // | Andy| 31| + // | Justin| 20| + // +-------+---------+ + + // Select people older than 21 + df.filter($"age" > 21).show() + // +---+----+ + // |age|name| + // +---+----+ + // | 30|Andy| + // +---+----+ + + // Count people by age + df.groupBy("age").count().show() + // +----+-----+ + // | age|count| + // +----+-----+ + // | 19| 1| + // |null| 1| + // | 30| 1| + // +----+-----+ + // $example off:untyped_ops$ + + // $example on:run_sql$ + // Register the DataFrame as a SQL temporary view + df.createOrReplaceTempView("people") + + val sqlDF = spark.sql("SELECT * FROM people") + sqlDF.show() + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + // $example off:run_sql$ + } + + private def runDatasetCreationExample(spark: SparkSession): Unit = { + import spark.implicits._ + // $example on:create_ds$ + // Encoders are created for case classes + val caseClassDS = Seq(Person("Andy", 32)).toDS() + caseClassDS.show() + // +----+---+ + // |name|age| + // +----+---+ + // |Andy| 32| + // +----+---+ + + // Encoders for most common types are automatically provided by importing spark.implicits._ + val primitiveDS = Seq(1, 2, 3).toDS() + primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) + + // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name + val path = "examples/src/main/resources/people.json" + val peopleDS = spark.read.json(path).as[Person] + peopleDS.show() + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + // $example off:create_ds$ + } + + private def runInferSchemaExample(spark: SparkSession): Unit = { + // $example on:schema_inferring$ + // For implicit conversions from RDDs to DataFrames + import spark.implicits._ + + // Create an RDD of Person objects from a text file, convert it to a Dataframe + val peopleDF = spark.sparkContext + .textFile("examples/src/main/resources/people.txt") + .map(_.split(",")) + .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) + .toDF() + // Register the DataFrame as a temporary view + peopleDF.createOrReplaceTempView("people") + + // SQL statements can be run by using the sql methods provided by Spark + val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") + + // The columns of a row in the result can be accessed by field index + teenagersDF.map(teenager => "Name: " + teenager(0)).show() + // +------------+ + // | value| + // +------------+ + // |Name: Justin| + // +------------+ + + // or by field name + teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() + // +------------+ + // | value| + // +------------+ + // |Name: Justin| + // +------------+ + + // No pre-defined encoders for Dataset[Map[K,V]], define explicitly + implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] + // Primitive types and case classes can be also defined as + implicit val stringIntMapEncoder: Encoder[Map[String, Int]] = ExpressionEncoder() + + // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] + teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() + // Array(Map("name" -> "Justin", "age" -> 19)) + // $example off:schema_inferring$ + } + + private def runProgrammaticSchemaExample(spark: SparkSession): Unit = { + import spark.implicits._ + // $example on:programmatic_schema$ + // Create an RDD + val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") + + // The schema is encoded in a string + val schemaString = "name age" + + // Generate the schema based on the string of schema + val fields = schemaString.split(" ") + .map(fieldName => StructField(fieldName, StringType, nullable = true)) + val schema = StructType(fields) + + // Convert records of the RDD (people) to Rows + val rowRDD = peopleRDD + .map(_.split(",")) + .map(attributes => Row(attributes(0), attributes(1).trim)) + + // Apply the schema to the RDD + val peopleDF = spark.createDataFrame(rowRDD, schema) + + // Creates a temporary view using the DataFrame + peopleDF.createOrReplaceTempView("people") + + // SQL can be run over a temporary view created using DataFrames + val results = spark.sql("SELECT name FROM people") + + // The results of SQL queries are DataFrames and support all the normal RDD operations + // The columns of a row in the result can be accessed by field index or by field name + results.map(attributes => "Name: " + attributes(0)).show() + // +-------------+ + // | value| + // +-------------+ + // |Name: Michael| + // | Name: Andy| + // | Name: Justin| + // +-------------+ + // $example off:programmatic_schema$ + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala deleted file mode 100644 index cf3f864267..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.examples.sql - -// $example on:schema_inferring$ -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.Encoder -// $example off:schema_inferring$ -import org.apache.spark.sql.Row -// $example on:init_session$ -import org.apache.spark.sql.SparkSession -// $example off:init_session$ -// $example on:programmatic_schema$ -import org.apache.spark.sql.types.StringType -import org.apache.spark.sql.types.StructField -import org.apache.spark.sql.types.StructType -// $example off:programmatic_schema$ - -object SparkSqlExample { - - // $example on:create_ds$ - // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, - // you can use custom classes that implement the Product interface - case class Person(name: String, age: Long) - // $example off:create_ds$ - - def main(args: Array[String]) { - // $example on:init_session$ - val spark = SparkSession - .builder() - .appName("Spark SQL Example") - .config("spark.some.config.option", "some-value") - .getOrCreate() - - // For implicit conversions like converting RDDs to DataFrames - import spark.implicits._ - // $example off:init_session$ - - runBasicDataFrameExample(spark) - runDatasetCreationExample(spark) - runInferSchemaExample(spark) - runProgrammaticSchemaExample(spark) - - spark.stop() - } - - private def runBasicDataFrameExample(spark: SparkSession): Unit = { - // $example on:create_df$ - val df = spark.read.json("examples/src/main/resources/people.json") - - // Displays the content of the DataFrame to stdout - df.show() - // +----+-------+ - // | age| name| - // +----+-------+ - // |null|Michael| - // | 30| Andy| - // | 19| Justin| - // +----+-------+ - // $example off:create_df$ - - // $example on:untyped_ops$ - // This import is needed to use the $-notation - import spark.implicits._ - // Print the schema in a tree format - df.printSchema() - // root - // |-- age: long (nullable = true) - // |-- name: string (nullable = true) - - // Select only the "name" column - df.select("name").show() - // +-------+ - // | name| - // +-------+ - // |Michael| - // | Andy| - // | Justin| - // +-------+ - - // Select everybody, but increment the age by 1 - df.select($"name", $"age" + 1).show() - // +-------+---------+ - // | name|(age + 1)| - // +-------+---------+ - // |Michael| null| - // | Andy| 31| - // | Justin| 20| - // +-------+---------+ - - // Select people older than 21 - df.filter($"age" > 21).show() - // +---+----+ - // |age|name| - // +---+----+ - // | 30|Andy| - // +---+----+ - - // Count people by age - df.groupBy("age").count().show() - // +----+-----+ - // | age|count| - // +----+-----+ - // | 19| 1| - // |null| 1| - // | 30| 1| - // +----+-----+ - // $example off:untyped_ops$ - - // $example on:run_sql$ - // Register the DataFrame as a SQL temporary view - df.createOrReplaceTempView("people") - - val sqlDF = spark.sql("SELECT * FROM people") - sqlDF.show() - // +----+-------+ - // | age| name| - // +----+-------+ - // |null|Michael| - // | 30| Andy| - // | 19| Justin| - // +----+-------+ - // $example off:run_sql$ - } - - private def runDatasetCreationExample(spark: SparkSession): Unit = { - import spark.implicits._ - // $example on:create_ds$ - // Encoders are created for case classes - val caseClassDS = Seq(Person("Andy", 32)).toDS() - caseClassDS.show() - // +----+---+ - // |name|age| - // +----+---+ - // |Andy| 32| - // +----+---+ - - // Encoders for most common types are automatically provided by importing spark.implicits._ - val primitiveDS = Seq(1, 2, 3).toDS() - primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) - - // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name - val path = "examples/src/main/resources/people.json" - val peopleDS = spark.read.json(path).as[Person] - peopleDS.show() - // +----+-------+ - // | age| name| - // +----+-------+ - // |null|Michael| - // | 30| Andy| - // | 19| Justin| - // +----+-------+ - // $example off:create_ds$ - } - - private def runInferSchemaExample(spark: SparkSession): Unit = { - // $example on:schema_inferring$ - // For implicit conversions from RDDs to DataFrames - import spark.implicits._ - - // Create an RDD of Person objects from a text file, convert it to a Dataframe - val peopleDF = spark.sparkContext - .textFile("examples/src/main/resources/people.txt") - .map(_.split(",")) - .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) - .toDF() - // Register the DataFrame as a temporary view - peopleDF.createOrReplaceTempView("people") - - // SQL statements can be run by using the sql methods provided by Spark - val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") - - // The columns of a row in the result can be accessed by field index - teenagersDF.map(teenager => "Name: " + teenager(0)).show() - // +------------+ - // | value| - // +------------+ - // |Name: Justin| - // +------------+ - - // or by field name - teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() - // +------------+ - // | value| - // +------------+ - // |Name: Justin| - // +------------+ - - // No pre-defined encoders for Dataset[Map[K,V]], define explicitly - implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] - // Primitive types and case classes can be also defined as - implicit val stringIntMapEncoder: Encoder[Map[String, Int]] = ExpressionEncoder() - - // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] - teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() - // Array(Map("name" -> "Justin", "age" -> 19)) - // $example off:schema_inferring$ - } - - private def runProgrammaticSchemaExample(spark: SparkSession): Unit = { - import spark.implicits._ - // $example on:programmatic_schema$ - // Create an RDD - val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") - - // The schema is encoded in a string - val schemaString = "name age" - - // Generate the schema based on the string of schema - val fields = schemaString.split(" ") - .map(fieldName => StructField(fieldName, StringType, nullable = true)) - val schema = StructType(fields) - - // Convert records of the RDD (people) to Rows - val rowRDD = peopleRDD - .map(_.split(",")) - .map(attributes => Row(attributes(0), attributes(1).trim)) - - // Apply the schema to the RDD - val peopleDF = spark.createDataFrame(rowRDD, schema) - - // Creates a temporary view using the DataFrame - peopleDF.createOrReplaceTempView("people") - - // SQL can be run over a temporary view created using DataFrames - val results = spark.sql("SELECT name FROM people") - - // The results of SQL queries are DataFrames and support all the normal RDD operations - // The columns of a row in the result can be accessed by field index or by field name - results.map(attributes => "Name: " + attributes(0)).show() - // +-------------+ - // | value| - // +-------------+ - // |Name: Michael| - // | Name: Andy| - // | Name: Justin| - // +-------------+ - // $example off:programmatic_schema$ - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala deleted file mode 100644 index 61dea6ad2c..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.examples.sql - -import org.apache.spark.sql.SparkSession - -object SqlDataSourceExample { - - case class Person(name: String, age: Long) - - def main(args: Array[String]) { - val spark = SparkSession - .builder() - .appName("Spark SQL Data Soures Example") - .config("spark.some.config.option", "some-value") - .getOrCreate() - - runBasicDataSourceExample(spark) - runBasicParquetExample(spark) - runParquetSchemaMergingExample(spark) - runJsonDatasetExample(spark) - - spark.stop() - } - - private def runBasicDataSourceExample(spark: SparkSession): Unit = { - // $example on:generic_load_save_functions$ - val usersDF = spark.read.load("examples/src/main/resources/users.parquet") - usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") - // $example off:generic_load_save_functions$ - // $example on:manual_load_options$ - val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") - peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet") - // $example off:manual_load_options$ - // $example on:direct_sql$ - val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") - // $example off:direct_sql$ - } - - private def runBasicParquetExample(spark: SparkSession): Unit = { - // $example on:basic_parquet_example$ - // Encoders for most common types are automatically provided by importing spark.implicits._ - import spark.implicits._ - - val peopleDF = spark.read.json("examples/src/main/resources/people.json") - - // DataFrames can be saved as Parquet files, maintaining the schema information - peopleDF.write.parquet("people.parquet") - - // Read in the parquet file created above - // Parquet files are self-describing so the schema is preserved - // The result of loading a Parquet file is also a DataFrame - val parquetFileDF = spark.read.parquet("people.parquet") - - // Parquet files can also be used to create a temporary view and then used in SQL statements - parquetFileDF.createOrReplaceTempView("parquetFile") - val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") - namesDF.map(attributes => "Name: " + attributes(0)).show() - // +------------+ - // | value| - // +------------+ - // |Name: Justin| - // +------------+ - // $example off:basic_parquet_example$ - } - - private def runParquetSchemaMergingExample(spark: SparkSession): Unit = { - // $example on:schema_merging$ - // This is used to implicitly convert an RDD to a DataFrame. - import spark.implicits._ - - // Create a simple DataFrame, store into a partition directory - val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") - squaresDF.write.parquet("data/test_table/key=1") - - // Create another DataFrame in a new partition directory, - // adding a new column and dropping an existing column - val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") - cubesDF.write.parquet("data/test_table/key=2") - - // Read the partitioned table - val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") - mergedDF.printSchema() - - // The final schema consists of all 3 columns in the Parquet files together - // with the partitioning column appeared in the partition directory paths - // root - // |-- value: int (nullable = true) - // |-- square: int (nullable = true) - // |-- cube: int (nullable = true) - // |-- key : int (nullable = true) - // $example off:schema_merging$ - } - - private def runJsonDatasetExample(spark: SparkSession): Unit = { - // $example on:json_dataset$ - // A JSON dataset is pointed to by path. - // The path can be either a single text file or a directory storing text files - val path = "examples/src/main/resources/people.json" - val peopleDF = spark.read.json(path) - - // The inferred schema can be visualized using the printSchema() method - peopleDF.printSchema() - // root - // |-- age: long (nullable = true) - // |-- name: string (nullable = true) - - // Creates a temporary view using the DataFrame - peopleDF.createOrReplaceTempView("people") - - // SQL statements can be run by using the sql methods provided by spark - val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") - teenagerNamesDF.show() - // +------+ - // | name| - // +------+ - // |Justin| - // +------+ - - // Alternatively, a DataFrame can be created for a JSON dataset represented by - // an RDD[String] storing one JSON object per string - val otherPeopleRDD = spark.sparkContext.makeRDD( - """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) - val otherPeople = spark.read.json(otherPeopleRDD) - otherPeople.show() - // +---------------+----+ - // | address|name| - // +---------------+----+ - // |[Columbus,Ohio]| Yin| - // +---------------+----+ - // $example off:json_dataset$ - } - -} -- cgit v1.2.3