aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
authoraokolnychyi <okolnychyyanton@gmail.com>2016-07-13 16:12:05 +0800
committerCheng Lian <lian@databricks.com>2016-07-13 16:12:11 +0800
commit772c213ec702c80d0f25aa6f30b2dffebfbe2d0d (patch)
tree426dd5bea713e61cadf215ca27ac04689a1739f1 /examples/src/main/scala
parent1c58fa905b6543d366d00b2e5394dfd633987f6d (diff)
downloadspark-772c213ec702c80d0f25aa6f30b2dffebfbe2d0d.tar.gz
spark-772c213ec702c80d0f25aa6f30b2dffebfbe2d0d.tar.bz2
spark-772c213ec702c80d0f25aa6f30b2dffebfbe2d0d.zip
[SPARK-16303][DOCS][EXAMPLES] Updated SQL programming guide and examples
- Hard-coded Spark SQL sample snippets were moved into source files under examples sub-project. - Removed the inconsistency between Scala and Java Spark SQL examples - Scala and Java Spark SQL examples were updated The work is still in progress. All involved examples were tested manually. An additional round of testing will be done after the code review. ![image](https://cloud.githubusercontent.com/assets/6235869/16710314/51851606-462a-11e6-9fbe-0818daef65e4.png) Author: aokolnychyi <okolnychyyanton@gmail.com> Closes #14119 from aokolnychyi/spark_16303.
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala254
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala148
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala83
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala107
4 files changed, 509 insertions, 83 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala
new file mode 100644
index 0000000000..cf3f864267
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSqlExample.scala
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+// $example on:schema_inferring$
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.Encoder
+// $example off:schema_inferring$
+import org.apache.spark.sql.Row
+// $example on:init_session$
+import org.apache.spark.sql.SparkSession
+// $example off:init_session$
+// $example on:programmatic_schema$
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.StructType
+// $example off:programmatic_schema$
+
+object SparkSqlExample {
+
+ // $example on:create_ds$
+ // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
+ // you can use custom classes that implement the Product interface
+ case class Person(name: String, age: Long)
+ // $example off:create_ds$
+
+ def main(args: Array[String]) {
+ // $example on:init_session$
+ val spark = SparkSession
+ .builder()
+ .appName("Spark SQL Example")
+ .config("spark.some.config.option", "some-value")
+ .getOrCreate()
+
+ // For implicit conversions like converting RDDs to DataFrames
+ import spark.implicits._
+ // $example off:init_session$
+
+ runBasicDataFrameExample(spark)
+ runDatasetCreationExample(spark)
+ runInferSchemaExample(spark)
+ runProgrammaticSchemaExample(spark)
+
+ spark.stop()
+ }
+
+ private def runBasicDataFrameExample(spark: SparkSession): Unit = {
+ // $example on:create_df$
+ val df = spark.read.json("examples/src/main/resources/people.json")
+
+ // Displays the content of the DataFrame to stdout
+ df.show()
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:create_df$
+
+ // $example on:untyped_ops$
+ // This import is needed to use the $-notation
+ import spark.implicits._
+ // Print the schema in a tree format
+ df.printSchema()
+ // root
+ // |-- age: long (nullable = true)
+ // |-- name: string (nullable = true)
+
+ // Select only the "name" column
+ df.select("name").show()
+ // +-------+
+ // | name|
+ // +-------+
+ // |Michael|
+ // | Andy|
+ // | Justin|
+ // +-------+
+
+ // Select everybody, but increment the age by 1
+ df.select($"name", $"age" + 1).show()
+ // +-------+---------+
+ // | name|(age + 1)|
+ // +-------+---------+
+ // |Michael| null|
+ // | Andy| 31|
+ // | Justin| 20|
+ // +-------+---------+
+
+ // Select people older than 21
+ df.filter($"age" > 21).show()
+ // +---+----+
+ // |age|name|
+ // +---+----+
+ // | 30|Andy|
+ // +---+----+
+
+ // Count people by age
+ df.groupBy("age").count().show()
+ // +----+-----+
+ // | age|count|
+ // +----+-----+
+ // | 19| 1|
+ // |null| 1|
+ // | 30| 1|
+ // +----+-----+
+ // $example off:untyped_ops$
+
+ // $example on:run_sql$
+ // Register the DataFrame as a SQL temporary view
+ df.createOrReplaceTempView("people")
+
+ val sqlDF = spark.sql("SELECT * FROM people")
+ sqlDF.show()
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:run_sql$
+ }
+
+ private def runDatasetCreationExample(spark: SparkSession): Unit = {
+ import spark.implicits._
+ // $example on:create_ds$
+ // Encoders are created for case classes
+ val caseClassDS = Seq(Person("Andy", 32)).toDS()
+ caseClassDS.show()
+ // +----+---+
+ // |name|age|
+ // +----+---+
+ // |Andy| 32|
+ // +----+---+
+
+ // Encoders for most common types are automatically provided by importing spark.implicits._
+ val primitiveDS = Seq(1, 2, 3).toDS()
+ primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
+
+ // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
+ val path = "examples/src/main/resources/people.json"
+ val peopleDS = spark.read.json(path).as[Person]
+ peopleDS.show()
+ // +----+-------+
+ // | age| name|
+ // +----+-------+
+ // |null|Michael|
+ // | 30| Andy|
+ // | 19| Justin|
+ // +----+-------+
+ // $example off:create_ds$
+ }
+
+ private def runInferSchemaExample(spark: SparkSession): Unit = {
+ // $example on:schema_inferring$
+ // For implicit conversions from RDDs to DataFrames
+ import spark.implicits._
+
+ // Create an RDD of Person objects from a text file, convert it to a Dataframe
+ val peopleDF = spark.sparkContext
+ .textFile("examples/src/main/resources/people.txt")
+ .map(_.split(","))
+ .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
+ .toDF()
+ // Register the DataFrame as a temporary view
+ peopleDF.createOrReplaceTempView("people")
+
+ // SQL statements can be run by using the sql methods provided by Spark
+ val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
+
+ // The columns of a row in the result can be accessed by field index
+ teenagersDF.map(teenager => "Name: " + teenager(0)).show()
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+
+ // or by field name
+ teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+
+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
+ implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
+ // Primitive types and case classes can be also defined as
+ implicit val stringIntMapEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()
+
+ // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
+ teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
+ // Array(Map("name" -> "Justin", "age" -> 19))
+ // $example off:schema_inferring$
+ }
+
+ private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
+ import spark.implicits._
+ // $example on:programmatic_schema$
+ // Create an RDD
+ val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
+
+ // The schema is encoded in a string
+ val schemaString = "name age"
+
+ // Generate the schema based on the string of schema
+ val fields = schemaString.split(" ")
+ .map(fieldName => StructField(fieldName, StringType, nullable = true))
+ val schema = StructType(fields)
+
+ // Convert records of the RDD (people) to Rows
+ val rowRDD = peopleRDD
+ .map(_.split(","))
+ .map(attributes => Row(attributes(0), attributes(1).trim))
+
+ // Apply the schema to the RDD
+ val peopleDF = spark.createDataFrame(rowRDD, schema)
+
+ // Creates a temporary view using the DataFrame
+ peopleDF.createOrReplaceTempView("people")
+
+ // SQL can be run over a temporary view created using DataFrames
+ val results = spark.sql("SELECT name FROM people")
+
+ // The results of SQL queries are DataFrames and support all the normal RDD operations
+ // The columns of a row in the result can be accessed by field index or by field name
+ results.map(attributes => "Name: " + attributes(0)).show()
+ // +-------------+
+ // | value|
+ // +-------------+
+ // |Name: Michael|
+ // | Name: Andy|
+ // | Name: Justin|
+ // +-------------+
+ // $example off:programmatic_schema$
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala
new file mode 100644
index 0000000000..61dea6ad2c
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+import org.apache.spark.sql.SparkSession
+
+object SqlDataSourceExample {
+
+ case class Person(name: String, age: Long)
+
+ def main(args: Array[String]) {
+ val spark = SparkSession
+ .builder()
+ .appName("Spark SQL Data Soures Example")
+ .config("spark.some.config.option", "some-value")
+ .getOrCreate()
+
+ runBasicDataSourceExample(spark)
+ runBasicParquetExample(spark)
+ runParquetSchemaMergingExample(spark)
+ runJsonDatasetExample(spark)
+
+ spark.stop()
+ }
+
+ private def runBasicDataSourceExample(spark: SparkSession): Unit = {
+ // $example on:generic_load_save_functions$
+ val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
+ usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
+ // $example off:generic_load_save_functions$
+ // $example on:manual_load_options$
+ val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
+ peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
+ // $example off:manual_load_options$
+ // $example on:direct_sql$
+ val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
+ // $example off:direct_sql$
+ }
+
+ private def runBasicParquetExample(spark: SparkSession): Unit = {
+ // $example on:basic_parquet_example$
+ // Encoders for most common types are automatically provided by importing spark.implicits._
+ import spark.implicits._
+
+ val peopleDF = spark.read.json("examples/src/main/resources/people.json")
+
+ // DataFrames can be saved as Parquet files, maintaining the schema information
+ peopleDF.write.parquet("people.parquet")
+
+ // Read in the parquet file created above
+ // Parquet files are self-describing so the schema is preserved
+ // The result of loading a Parquet file is also a DataFrame
+ val parquetFileDF = spark.read.parquet("people.parquet")
+
+ // Parquet files can also be used to create a temporary view and then used in SQL statements
+ parquetFileDF.createOrReplaceTempView("parquetFile")
+ val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
+ namesDF.map(attributes => "Name: " + attributes(0)).show()
+ // +------------+
+ // | value|
+ // +------------+
+ // |Name: Justin|
+ // +------------+
+ // $example off:basic_parquet_example$
+ }
+
+ private def runParquetSchemaMergingExample(spark: SparkSession): Unit = {
+ // $example on:schema_merging$
+ // This is used to implicitly convert an RDD to a DataFrame.
+ import spark.implicits._
+
+ // Create a simple DataFrame, store into a partition directory
+ val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
+ squaresDF.write.parquet("data/test_table/key=1")
+
+ // Create another DataFrame in a new partition directory,
+ // adding a new column and dropping an existing column
+ val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
+ cubesDF.write.parquet("data/test_table/key=2")
+
+ // Read the partitioned table
+ val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
+ mergedDF.printSchema()
+
+ // The final schema consists of all 3 columns in the Parquet files together
+ // with the partitioning column appeared in the partition directory paths
+ // root
+ // |-- value: int (nullable = true)
+ // |-- square: int (nullable = true)
+ // |-- cube: int (nullable = true)
+ // |-- key : int (nullable = true)
+ // $example off:schema_merging$
+ }
+
+ private def runJsonDatasetExample(spark: SparkSession): Unit = {
+ // $example on:json_dataset$
+ // A JSON dataset is pointed to by path.
+ // The path can be either a single text file or a directory storing text files
+ val path = "examples/src/main/resources/people.json"
+ val peopleDF = spark.read.json(path)
+
+ // The inferred schema can be visualized using the printSchema() method
+ peopleDF.printSchema()
+ // root
+ // |-- age: long (nullable = true)
+ // |-- name: string (nullable = true)
+
+ // Creates a temporary view using the DataFrame
+ peopleDF.createOrReplaceTempView("people")
+
+ // SQL statements can be run by using the sql methods provided by spark
+ val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
+ teenagerNamesDF.show()
+ // +------+
+ // | name|
+ // +------+
+ // |Justin|
+ // +------+
+
+ // Alternatively, a DataFrame can be created for a JSON dataset represented by
+ // an RDD[String] storing one JSON object per string
+ val otherPeopleRDD = spark.sparkContext.makeRDD(
+ """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
+ val otherPeople = spark.read.json(otherPeopleRDD)
+ otherPeople.show()
+ // +---------------+----+
+ // | address|name|
+ // +---------------+----+
+ // |[Columbus,Ohio]| Yin|
+ // +---------------+----+
+ // $example off:json_dataset$
+ }
+
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
deleted file mode 100644
index 2343f98c8d..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.sql.hive
-
-import java.io.File
-
-import com.google.common.io.{ByteStreams, Files}
-
-import org.apache.spark.sql._
-
-object HiveFromSpark {
- case class Record(key: Int, value: String)
-
- // Copy kv1.txt file from classpath to temporary directory
- val kv1Stream = HiveFromSpark.getClass.getResourceAsStream("/kv1.txt")
- val kv1File = File.createTempFile("kv1", "txt")
- kv1File.deleteOnExit()
- ByteStreams.copy(kv1Stream, Files.newOutputStreamSupplier(kv1File))
-
- def main(args: Array[String]) {
- // When working with Hive, one must instantiate `SparkSession` with Hive support, including
- // connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined
- // functions. Users who do not have an existing Hive deployment can still enable Hive support.
- // When not configured by the hive-site.xml, the context automatically creates `metastore_db`
- // in the current directory and creates a directory configured by `spark.sql.warehouse.dir`,
- // which defaults to the directory `spark-warehouse` in the current directory that the spark
- // application is started.
- val spark = SparkSession.builder
- .appName("HiveFromSpark")
- .enableHiveSupport()
- .getOrCreate()
-
- import spark.implicits._
- import spark.sql
-
- sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
- sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src")
-
- // Queries are expressed in HiveQL
- println("Result of 'SELECT *': ")
- sql("SELECT * FROM src").collect().foreach(println)
-
- // Aggregation queries are also supported.
- val count = sql("SELECT COUNT(*) FROM src").collect().head.getLong(0)
- println(s"COUNT(*): $count")
-
- // The results of SQL queries are themselves RDDs and support all normal RDD functions. The
- // items in the RDD are of type Row, which allows you to access each column by ordinal.
- val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
-
- println("Result of RDD.map:")
- val rddAsStrings = rddFromSql.rdd.map {
- case Row(key: Int, value: String) => s"Key: $key, Value: $value"
- }
-
- // You can also use RDDs to create temporary views within a HiveContext.
- val rdd = spark.sparkContext.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
- rdd.toDF().createOrReplaceTempView("records")
-
- // Queries can then join RDD data with data stored in Hive.
- println("Result of SELECT *:")
- sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
-
- spark.stop()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
new file mode 100644
index 0000000000..e897c2d066
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.hive
+
+// $example on:spark_hive$
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+// $example off:spark_hive$
+
+object SparkHiveExample {
+
+ // $example on:spark_hive$
+ case class Record(key: Int, value: String)
+ // $example off:spark_hive$
+
+ def main(args: Array[String]) {
+ // When working with Hive, one must instantiate `SparkSession` with Hive support, including
+ // connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined
+ // functions. Users who do not have an existing Hive deployment can still enable Hive support.
+ // When not configured by the hive-site.xml, the context automatically creates `metastore_db`
+ // in the current directory and creates a directory configured by `spark.sql.warehouse.dir`,
+ // which defaults to the directory `spark-warehouse` in the current directory that the spark
+ // application is started.
+
+ // $example on:spark_hive$
+ // warehouseLocation points to the default location for managed databases and tables
+ val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
+
+ val spark = SparkSession
+ .builder()
+ .appName("Spark Hive Example")
+ .config("spark.sql.warehouse.dir", warehouseLocation)
+ .enableHiveSupport()
+ .getOrCreate()
+
+ import spark.implicits._
+ import spark.sql
+
+ sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+ sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
+
+ // Queries are expressed in HiveQL
+ sql("SELECT * FROM src").show()
+ // +---+-------+
+ // |key| value|
+ // +---+-------+
+ // |238|val_238|
+ // | 86| val_86|
+ // |311|val_311|
+ // ...
+
+ // Aggregation queries are also supported.
+ sql("SELECT COUNT(*) FROM src").show()
+ // +--------+
+ // |count(1)|
+ // +--------+
+ // | 500 |
+ // +--------+
+
+ // The results of SQL queries are themselves DataFrames and support all normal functions.
+ val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
+
+ // The items in DaraFrames are of type Row, which allows you to access each column by ordinal.
+ val stringsDS = sqlDF.map {
+ case Row(key: Int, value: String) => s"Key: $key, Value: $value"
+ }
+ stringsDS.show()
+ // +--------------------+
+ // | value|
+ // +--------------------+
+ // |Key: 0, Value: val_0|
+ // |Key: 0, Value: val_0|
+ // |Key: 0, Value: val_0|
+ // ...
+
+ // You can also use DataFrames to create temporary views within a HiveContext.
+ val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
+ recordsDF.createOrReplaceTempView("records")
+
+ // Queries can then join DataFrame data with data stored in Hive.
+ sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
+ // +---+------+---+------+
+ // |key| value|key| value|
+ // +---+------+---+------+
+ // | 2| val_2| 2| val_2|
+ // | 2| val_2| 2| val_2|
+ // | 4| val_4| 4| val_4|
+ // ...
+ // $example off:spark_hive$
+
+ spark.stop()
+ }
+}