aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java
diff options
context:
space:
mode:
authoraokolnychyi <okolnychyyanton@gmail.com>2016-07-13 16:12:05 +0800
committerCheng Lian <lian@databricks.com>2016-07-13 16:12:11 +0800
commit772c213ec702c80d0f25aa6f30b2dffebfbe2d0d (patch)
tree426dd5bea713e61cadf215ca27ac04689a1739f1 /examples/src/main/java
parent1c58fa905b6543d366d00b2e5394dfd633987f6d (diff)
downloadspark-772c213ec702c80d0f25aa6f30b2dffebfbe2d0d.tar.gz
spark-772c213ec702c80d0f25aa6f30b2dffebfbe2d0d.tar.bz2
spark-772c213ec702c80d0f25aa6f30b2dffebfbe2d0d.zip
[SPARK-16303][DOCS][EXAMPLES] Updated SQL programming guide and examples
- Hard-coded Spark SQL sample snippets were moved into source files under examples sub-project. - Removed the inconsistency between Scala and Java Spark SQL examples - Scala and Java Spark SQL examples were updated The work is still in progress. All involved examples were tested manually. An additional round of testing will be done after the code review. ![image](https://cloud.githubusercontent.com/assets/6235869/16710314/51851606-462a-11e6-9fbe-0818daef65e4.png) Author: aokolnychyi <okolnychyyanton@gmail.com> Closes #14119 from aokolnychyi/spark_16303.
Diffstat (limited to 'examples/src/main/java')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java186
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java336
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java217
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java131
4 files changed, 684 insertions, 186 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
deleted file mode 100644
index 7fc6c007b6..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.sql;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-// $example on:init_session$
-import org.apache.spark.sql.SparkSession;
-// $example off:init_session$
-
-public class JavaSparkSQL {
- public static class Person implements Serializable {
- private String name;
- private int age;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getAge() {
- return age;
- }
-
- public void setAge(int age) {
- this.age = age;
- }
- }
-
- public static void main(String[] args) throws Exception {
- // $example on:init_session$
- SparkSession spark = SparkSession
- .builder()
- .appName("JavaSparkSQL")
- .config("spark.some.config.option", "some-value")
- .getOrCreate();
- // $example off:init_session$
-
- System.out.println("=== Data source: RDD ===");
- // Load a text file and convert each line to a Java Bean.
- String file = "examples/src/main/resources/people.txt";
- JavaRDD<Person> people = spark.read().textFile(file).javaRDD().map(
- new Function<String, Person>() {
- @Override
- public Person call(String line) {
- String[] parts = line.split(",");
-
- Person person = new Person();
- person.setName(parts[0]);
- person.setAge(Integer.parseInt(parts[1].trim()));
-
- return person;
- }
- });
-
- // Apply a schema to an RDD of Java Beans and create a temporary view
- Dataset<Row> schemaPeople = spark.createDataFrame(people, Person.class);
- schemaPeople.createOrReplaceTempView("people");
-
- // SQL can be run over RDDs which backs a temporary view.
- Dataset<Row> teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
-
- // The results of SQL queries are DataFrames and support all the normal RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
- @Override
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }).collect();
- for (String name: teenagerNames) {
- System.out.println(name);
- }
-
- System.out.println("=== Data source: Parquet File ===");
- // DataFrames can be saved as parquet files, maintaining the schema information.
- schemaPeople.write().parquet("people.parquet");
-
- // Read in the parquet file created above.
- // Parquet files are self-describing so the schema is preserved.
- // The result of loading a parquet file is also a DataFrame.
- Dataset<Row> parquetFile = spark.read().parquet("people.parquet");
-
- // A temporary view can be created by using Parquet files and then used in SQL statements.
- parquetFile.createOrReplaceTempView("parquetFile");
- Dataset<Row> teenagers2 =
- spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
- teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
- @Override
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }).collect();
- for (String name: teenagerNames) {
- System.out.println(name);
- }
-
- System.out.println("=== Data source: JSON Dataset ===");
- // A JSON dataset is pointed by path.
- // The path can be either a single text file or a directory storing text files.
- String path = "examples/src/main/resources/people.json";
- // Create a DataFrame from the file(s) pointed by path
- Dataset<Row> peopleFromJsonFile = spark.read().json(path);
-
- // Because the schema of a JSON dataset is automatically inferred, to write queries,
- // it is better to take a look at what is the schema.
- peopleFromJsonFile.printSchema();
- // The schema of people is ...
- // root
- // |-- age: IntegerType
- // |-- name: StringType
-
- // Creates a temporary view using the DataFrame
- peopleFromJsonFile.createOrReplaceTempView("people");
-
- // SQL statements can be run by using the sql methods provided by `spark`
- Dataset<Row> teenagers3 = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
-
- // The results of SQL queries are DataFrame and support all the normal RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
- @Override
- public String call(Row row) { return "Name: " + row.getString(0); }
- }).collect();
- for (String name: teenagerNames) {
- System.out.println(name);
- }
-
- // Alternatively, a DataFrame can be created for a JSON dataset represented by
- // a RDD[String] storing one JSON object per string.
- List<String> jsonData = Arrays.asList(
- "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
- JavaRDD<String> anotherPeopleRDD = spark
- .createDataFrame(jsonData, String.class).toJSON().javaRDD();
- Dataset<Row> peopleFromJsonRDD = spark.read().json(anotherPeopleRDD);
-
- // Take a look at the schema of this new DataFrame.
- peopleFromJsonRDD.printSchema();
- // The schema of anotherPeople is ...
- // root
- // |-- address: StructType
- // | |-- city: StringType
- // | |-- state: StringType
- // |-- name: StringType
-
- peopleFromJsonRDD.createOrReplaceTempView("people2");
-
- Dataset<Row> peopleWithCity = spark.sql("SELECT name, address.city FROM people2");
- List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
- @Override
- public String call(Row row) {
- return "Name: " + row.getString(0) + ", City: " + row.getString(1);
- }
- }).collect();
- for (String name: nameAndCity) {
- System.out.println(name);
- }
-
- spark.stop();
- }
-}
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java
new file mode 100644
index 0000000000..586d6e3a3e
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql;
+
+// $example on:programmatic_schema$
+import java.util.ArrayList;
+import java.util.List;
+// $example off:programmatic_schema$
+// $example on:create_ds$
+import java.util.Arrays;
+import java.util.Collections;
+import java.io.Serializable;
+// $example off:create_ds$
+
+// $example on:schema_inferring$
+// $example on:programmatic_schema$
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+// $example off:programmatic_schema$
+// $example on:create_ds$
+import org.apache.spark.api.java.function.MapFunction;
+// $example on:create_df$
+// $example on:run_sql$
+// $example on:programmatic_schema$
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+// $example off:programmatic_schema$
+// $example off:create_df$
+// $example off:run_sql$
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+// $example off:create_ds$
+// $example off:schema_inferring$
+import org.apache.spark.sql.RowFactory;
+// $example on:init_session$
+import org.apache.spark.sql.SparkSession;
+// $example off:init_session$
+// $example on:programmatic_schema$
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+// $example off:programmatic_schema$
+
+// $example on:untyped_ops$
+// col("...") is preferable to df.col("...")
+import static org.apache.spark.sql.functions.col;
+// $example off:untyped_ops$
+
+public class JavaSparkSqlExample {
+ // $example on:create_ds$
+ public static class Person implements Serializable {
+ private String name;
+ private int age;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+ }
+ // $example off:create_ds$
+
+ public static void main(String[] args) {
+ // $example on:init_session$
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Java Spark SQL Example")
+ .config("spark.some.config.option", "some-value")
+ .getOrCreate();
+ // $example off:init_session$
+
+ runBasicDataFrameExample(spark);
+ runDatasetCreationExample(spark);
+ runInferSchemaExample(spark);
+ runProgrammaticSchemaExample(spark);
+
+ spark.stop();
+ }
+
+ private static void runBasicDataFrameExample(SparkSession spark) {
+ // $example on:create_df$
+ Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
+
+ // Displays the content of the DataFrame to stdout
+ df.show();
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:create_df$
+
+ // $example on:untyped_ops$
+ // Print the schema in a tree format
+ df.printSchema();
+ // root
+ // |-- age: long (nullable = true)
+ // |-- name: string (nullable = true)
+
+ // Select only the "name" column
+ df.select("name").show();
+ // +-------+
+ // | name|
+ // +-------+
+ // |Michael|
+ // | Andy|
+ // | Justin|
+ // +-------+
+
+ // Select everybody, but increment the age by 1
+ df.select(col("name"), col("age").plus(1)).show();
+ // +-------+---------+
+ // | name|(age + 1)|
+ // +-------+---------+
+ // |Michael| null|
+ // | Andy| 31|
+ // | Justin| 20|
+ // +-------+---------+
+
+ // Select people older than 21
+ df.filter(col("age").gt(21)).show();
+ // +---+----+
+ // |age|name|
+ // +---+----+
+ // | 30|Andy|
+ // +---+----+
+
+ // Count people by age
+ df.groupBy("age").count().show();
+ // +----+-----+
+ // | age|count|
+ // +----+-----+
+ // | 19| 1|
+ // |null| 1|
+ // | 30| 1|
+ // +----+-----+
+ // $example off:untyped_ops$
+
+ // $example on:run_sql$
+ // Register the DataFrame as a SQL temporary view
+ df.createOrReplaceTempView("people");
+
+ Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
+ sqlDF.show();
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:run_sql$
+ }
+
+ private static void runDatasetCreationExample(SparkSession spark) {
+ // $example on:create_ds$
+ // Create an instance of a Bean class
+ Person person = new Person();
+ person.setName("Andy");
+ person.setAge(32);
+
+ // Encoders are created for Java beans
+ Encoder<Person> personEncoder = Encoders.bean(Person.class);
+ Dataset<Person> javaBeanDS = spark.createDataset(
+ Collections.singletonList(person),
+ personEncoder
+ );
+ javaBeanDS.show();
+ // +---+----+
+ // |age|name|
+ // +---+----+
+ // | 32|Andy|
+ // +---+----+
+
+ // Encoders for most common types are provided in class Encoders
+ Encoder<Integer> integerEncoder = Encoders.INT();
+ Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
+ Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
+ @Override
+ public Integer call(Integer value) throws Exception {
+ return value + 1;
+ }
+ }, integerEncoder);
+ transformedDS.collect(); // Returns [2, 3, 4]
+
+ // DataFrames can be converted to a Dataset by providing a class. Mapping based on name
+ String path = "examples/src/main/resources/people.json";
+ Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
+ peopleDS.show();
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:create_ds$
+ }
+
+ private static void runInferSchemaExample(SparkSession spark) {
+ // $example on:schema_inferring$
+ // Create an RDD of Person objects from a text file
+ JavaRDD<Person> peopleRDD = spark.read()
+ .textFile("examples/src/main/resources/people.txt")
+ .javaRDD()
+ .map(new Function<String, Person>() {
+ @Override
+ public Person call(String line) throws Exception {
+ String[] parts = line.split(",");
+ Person person = new Person();
+ person.setName(parts[0]);
+ person.setAge(Integer.parseInt(parts[1].trim()));
+ return person;
+ }
+ });
+
+ // Apply a schema to an RDD of JavaBeans to get a DataFrame
+ Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
+ // Register the DataFrame as a temporary view
+ peopleDF.createOrReplaceTempView("people");
+
+ // SQL statements can be run by using the sql methods provided by spark
+ Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
+
+ // The columns of a row in the result can be accessed by field index
+ Encoder<String> stringEncoder = Encoders.STRING();
+ Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() {
+ @Override
+ public String call(Row row) throws Exception {
+ return "Name: " + row.getString(0);
+ }
+ }, stringEncoder);
+ teenagerNamesByIndexDF.show();
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+
+ // or by field name
+ Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() {
+ @Override
+ public String call(Row row) throws Exception {
+ return "Name: " + row.<String>getAs("name");
+ }
+ }, stringEncoder);
+ teenagerNamesByFieldDF.show();
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+ // $example off:schema_inferring$
+ }
+
+ private static void runProgrammaticSchemaExample(SparkSession spark) {
+ // $example on:programmatic_schema$
+ // Create an RDD
+ JavaRDD<String> peopleRDD = spark.sparkContext()
+ .textFile("examples/src/main/resources/people.txt", 1)
+ .toJavaRDD();
+
+ // The schema is encoded in a string
+ String schemaString = "name age";
+
+ // Generate the schema based on the string of schema
+ List<StructField> fields = new ArrayList<>();
+ for (String fieldName : schemaString.split(" ")) {
+ StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
+ fields.add(field);
+ }
+ StructType schema = DataTypes.createStructType(fields);
+
+ // Convert records of the RDD (people) to Rows
+ JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() {
+ @Override
+ public Row call(String record) throws Exception {
+ String[] attributes = record.split(",");
+ return RowFactory.create(attributes[0], attributes[1].trim());
+ }
+ });
+
+ // Apply the schema to the RDD
+ Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
+
+ // Creates a temporary view using the DataFrame
+ peopleDataFrame.createOrReplaceTempView("people");
+
+ // SQL can be run over a temporary view created using DataFrames
+ Dataset<Row> results = spark.sql("SELECT name FROM people");
+
+ // The results of SQL queries are DataFrames and support all the normal RDD operations
+ // The columns of a row in the result can be accessed by field index or by field name
+ Dataset<String> namesDS = results.map(new MapFunction<Row, String>() {
+ @Override
+ public String call(Row row) throws Exception {
+ return "Name: " + row.getString(0);
+ }
+ }, Encoders.STRING());
+ namesDS.show();
+ // +-------------+
+ // | value|
+ // +-------------+
+ // |Name: Michael|
+ // | Name: Andy|
+ // | Name: Justin|
+ // +-------------+
+ // $example off:programmatic_schema$
+ }
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java
new file mode 100644
index 0000000000..4db5e1b0af
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSqlDataSourceExample.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql;
+
+// $example on:schema_merging$
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+// $example off:schema_merging$
+
+// $example on:basic_parquet_example$
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Encoders;
+// import org.apache.spark.sql.Encoders;
+// $example on:schema_merging$
+// $example on:json_dataset$
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+// $example off:json_dataset$
+// $example off:schema_merging$
+// $example off:basic_parquet_example$
+import org.apache.spark.sql.SparkSession;
+
+public class JavaSqlDataSourceExample {
+
+ // $example on:schema_merging$
+ public static class Square implements Serializable {
+ private int value;
+ private int square;
+
+ // Getters and setters...
+ // $example off:schema_merging$
+ public int getValue() {
+ return value;
+ }
+
+ public void setValue(int value) {
+ this.value = value;
+ }
+
+ public int getSquare() {
+ return square;
+ }
+
+ public void setSquare(int square) {
+ this.square = square;
+ }
+ // $example on:schema_merging$
+ }
+ // $example off:schema_merging$
+
+ // $example on:schema_merging$
+ public static class Cube implements Serializable {
+ private int value;
+ private int cube;
+
+ // Getters and setters...
+ // $example off:schema_merging$
+ public int getValue() {
+ return value;
+ }
+
+ public void setValue(int value) {
+ this.value = value;
+ }
+
+ public int getCube() {
+ return cube;
+ }
+
+ public void setCube(int cube) {
+ this.cube = cube;
+ }
+ // $example on:schema_merging$
+ }
+ // $example off:schema_merging$
+
+ public static void main(String[] args) {
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Java Spark SQL Data Sources Example")
+ .config("spark.some.config.option", "some-value")
+ .getOrCreate();
+
+ runBasicDataSourceExample(spark);
+ runBasicParquetExample(spark);
+ runParquetSchemaMergingExample(spark);
+ runJsonDatasetExample(spark);
+
+ spark.stop();
+ }
+
+ private static void runBasicDataSourceExample(SparkSession spark) {
+ // $example on:generic_load_save_functions$
+ Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
+ usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
+ // $example off:generic_load_save_functions$
+ // $example on:manual_load_options$
+ Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json");
+ peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
+ // $example off:manual_load_options$
+ // $example on:direct_sql$
+ Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
+ // $example off:direct_sql$
+ }
+
+ private static void runBasicParquetExample(SparkSession spark) {
+ // $example on:basic_parquet_example$
+ Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
+
+ // DataFrames can be saved as Parquet files, maintaining the schema information
+ peopleDF.write().parquet("people.parquet");
+
+ // Read in the Parquet file created above.
+ // Parquet files are self-describing so the schema is preserved
+ // The result of loading a parquet file is also a DataFrame
+ Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
+
+ // Parquet files can also be used to create a temporary view and then used in SQL statements
+ parquetFileDF.createOrReplaceTempView("parquetFile");
+ Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
+ Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() {
+ public String call(Row row) {
+ return "Name: " + row.getString(0);
+ }
+ }, Encoders.STRING());
+ namesDS.show();
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+ // $example off:basic_parquet_example$
+ }
+
+ private static void runParquetSchemaMergingExample(SparkSession spark) {
+ // $example on:schema_merging$
+ List<Square> squares = new ArrayList<>();
+ for (int value = 1; value <= 5; value++) {
+ Square square = new Square();
+ square.setValue(value);
+ square.setSquare(value * value);
+ squares.add(square);
+ }
+
+ // Create a simple DataFrame, store into a partition directory
+ Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
+ squaresDF.write().parquet("data/test_table/key=1");
+
+ List<Cube> cubes = new ArrayList<>();
+ for (int value = 6; value <= 10; value++) {
+ Cube cube = new Cube();
+ cube.setValue(value);
+ cube.setCube(value * value * value);
+ cubes.add(cube);
+ }
+
+ // Create another DataFrame in a new partition directory,
+ // adding a new column and dropping an existing column
+ Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
+ cubesDF.write().parquet("data/test_table/key=2");
+
+ // Read the partitioned table
+ Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
+ mergedDF.printSchema();
+
+ // The final schema consists of all 3 columns in the Parquet files together
+ // with the partitioning column appeared in the partition directory paths
+ // root
+ // |-- value: int (nullable = true)
+ // |-- square: int (nullable = true)
+ // |-- cube: int (nullable = true)
+ // |-- key : int (nullable = true)
+ // $example off:schema_merging$
+ }
+
+ private static void runJsonDatasetExample(SparkSession spark) {
+ // $example on:json_dataset$
+ // A JSON dataset is pointed to by path.
+ // The path can be either a single text file or a directory storing text files
+ Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
+
+ // The inferred schema can be visualized using the printSchema() method
+ people.printSchema();
+ // root
+ // |-- age: long (nullable = true)
+ // |-- name: string (nullable = true)
+
+ // Creates a temporary view using the DataFrame
+ people.createOrReplaceTempView("people");
+
+ // SQL statements can be run by using the sql methods provided by spark
+ Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
+ namesDF.show();
+ // +------+
+ // | name|
+ // +------+
+ // |Justin|
+ // +------+
+ // $example off:json_dataset$
+ }
+
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
new file mode 100644
index 0000000000..493d759a91
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.hive;
+
+// $example on:spark_hive$
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+// $example off:spark_hive$
+
+public class JavaSparkHiveExample {
+
+ // $example on:spark_hive$
+ public static class Record implements Serializable {
+ private int key;
+ private String value;
+
+ public int getKey() {
+ return key;
+ }
+
+ public void setKey(int key) {
+ this.key = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+ }
+ // $example off:spark_hive$
+
+ public static void main(String[] args) {
+ // $example on:spark_hive$
+ // warehouseLocation points to the default location for managed databases and tables
+ String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Java Spark Hive Example")
+ .config("spark.sql.warehouse.dir", warehouseLocation)
+ .enableHiveSupport()
+ .getOrCreate();
+
+ spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
+ spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
+
+ // Queries are expressed in HiveQL
+ spark.sql("SELECT * FROM src").show();
+ // +---+-------+
+ // |key| value|
+ // +---+-------+
+ // |238|val_238|
+ // | 86| val_86|
+ // |311|val_311|
+ // ...
+
+ // Aggregation queries are also supported.
+ spark.sql("SELECT COUNT(*) FROM src").show();
+ // +--------+
+ // |count(1)|
+ // +--------+
+ // | 500 |
+ // +--------+
+
+ // The results of SQL queries are themselves DataFrames and support all normal functions.
+ Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
+
+ // The items in DaraFrames are of type Row, which lets you to access each column by ordinal.
+ Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() {
+ @Override
+ public String call(Row row) throws Exception {
+ return "Key: " + row.get(0) + ", Value: " + row.get(1);
+ }
+ }, Encoders.STRING());
+ stringsDS.show();
+ // +--------------------+
+ // | value|
+ // +--------------------+
+ // |Key: 0, Value: val_0|
+ // |Key: 0, Value: val_0|
+ // |Key: 0, Value: val_0|
+ // ...
+
+ // You can also use DataFrames to create temporary views within a HiveContext.
+ List<Record> records = new ArrayList<>();
+ for (int key = 1; key < 100; key++) {
+ Record record = new Record();
+ record.setKey(key);
+ record.setValue("val_" + key);
+ records.add(record);
+ }
+ Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
+ recordsDF.createOrReplaceTempView("records");
+
+ // Queries can then join DataFrames data with data stored in Hive.
+ spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
+ // +---+------+---+------+
+ // |key| value|key| value|
+ // +---+------+---+------+
+ // | 2| val_2| 2| val_2|
+ // | 2| val_2| 2| val_2|
+ // | 4| val_4| 4| val_4|
+ // ...
+ // $example off:spark_hive$
+
+ spark.stop();
+ }
+}