aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-01-13 13:01:27 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-13 13:01:27 -0800
commit6463e0b9e8067cce70602c5c9006a2546856a9d6 (patch)
tree1cd9f4f118659296334ad3c2278dfb24eb1144ac /sql/hive
parent8ead999fd627b12837fb2f082a0e76e9d121d269 (diff)
downloadspark-6463e0b9e8067cce70602c5c9006a2546856a9d6.tar.gz
spark-6463e0b9e8067cce70602c5c9006a2546856a9d6.tar.bz2
spark-6463e0b9e8067cce70602c5c9006a2546856a9d6.zip
[SPARK-4912][SQL] Persistent tables for the Spark SQL data sources api
With changes in this PR, users can persist metadata of tables created based on the data source API in metastore through DDLs. Author: Yin Huai <yhuai@databricks.com> Author: Michael Armbrust <michael@databricks.com> Closes #3960 from yhuai/persistantTablesWithSchema2 and squashes the following commits: 069c235 [Yin Huai] Make exception messages user friendly. c07cbc6 [Yin Huai] Get the location of test file in a correct way. 4456e98 [Yin Huai] Test data. 5315dfc [Yin Huai] rxin's comments. 7fc4b56 [Yin Huai] Add DDLStrategy and HiveDDLStrategy to plan DDLs based on the data source API. aeaf4b3 [Yin Huai] Add comments. 06f9b0c [Yin Huai] Revert unnecessary changes. feb88aa [Yin Huai] Merge remote-tracking branch 'apache/master' into persistantTablesWithSchema2 172db80 [Yin Huai] Fix unit test. 49bf1ac [Yin Huai] Unit tests. 8f8f1a1 [Yin Huai] [SPARK-4574][SQL] Adding support for defining schema in foreign DDL commands. #3431 f47fda1 [Yin Huai] Unit tests. 2b59723 [Michael Armbrust] Set external when creating tables c00bb1b [Michael Armbrust] Don't use reflection to read options 1ea6e7b [Michael Armbrust] Don't fail when trying to uncache a table that doesn't exist 6edc710 [Michael Armbrust] Add tests. d7da491 [Michael Armbrust] First draft of persistent tables.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala79
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala21
-rw-r--r--sql/hive/src/test/resources/sample.json2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala244
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala1
8 files changed, 367 insertions, 4 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 02eac43b21..09ff4cc5ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -115,6 +115,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}
+ def refreshTable(tableName: String): Unit = {
+ // TODO: Database support...
+ catalog.refreshTable("default", tableName)
+ }
+
+ protected[hive] def invalidateTable(tableName: String): Unit = {
+ // TODO: Database support...
+ catalog.invalidateTable("default", tableName)
+ }
+
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
@@ -340,6 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
DataSourceStrategy,
HiveCommandStrategy(self),
+ HiveDDLStrategy,
+ DDLStrategy,
TakeOrdered,
ParquetOperations,
InMemoryScans,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c25288e000..daeabb6c8b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,10 +20,11 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.{List => JList}
+import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
+
import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
+import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
import org.apache.hadoop.hive.ql.metadata.InvalidTableException
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
@@ -39,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.util.Utils
/* Implicit conversions */
@@ -50,8 +52,76 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)
+ // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
+ /** A fully qualified identifier for a table (i.e., database.tableName) */
+ case class QualifiedTableName(database: String, name: String) {
+ def toLowerCase = QualifiedTableName(database.toLowerCase, name.toLowerCase)
+ }
+
+ /** A cache of Spark SQL data source tables that have been accessed. */
+ protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
+ val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
+ override def load(in: QualifiedTableName): LogicalPlan = {
+ logDebug(s"Creating new cached data source for $in")
+ val table = client.getTable(in.database, in.name)
+ val schemaString = table.getProperty("spark.sql.sources.schema")
+ val userSpecifiedSchema =
+ if (schemaString == null) {
+ None
+ } else {
+ Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
+ }
+ // It does not appear that the ql client for the metastore has a way to enumerate all the
+ // SerDe properties directly...
+ val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
+
+ val resolvedRelation =
+ ResolvedDataSource(
+ hive,
+ userSpecifiedSchema,
+ table.getProperty("spark.sql.sources.provider"),
+ options)
+
+ LogicalRelation(resolvedRelation.relation)
+ }
+ }
+
+ CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
+ }
+
+ def refreshTable(databaseName: String, tableName: String): Unit = {
+ cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
+ }
+
+ def invalidateTable(databaseName: String, tableName: String): Unit = {
+ cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
+ }
+
val caseSensitive: Boolean = false
+ def createDataSourceTable(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String]) = {
+ val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
+ val tbl = new Table(dbName, tblName)
+
+ tbl.setProperty("spark.sql.sources.provider", provider)
+ if (userSpecifiedSchema.isDefined) {
+ tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
+ }
+ options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
+
+ tbl.setProperty("EXTERNAL", "TRUE")
+ tbl.setTableType(TableType.EXTERNAL_TABLE)
+
+ // create the table
+ synchronized {
+ client.createTable(tbl, false)
+ }
+ }
+
def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
@@ -72,7 +142,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
val table = client.getTable(databaseName, tblName)
- if (table.isView) {
+
+ if (table.getProperty("spark.sql.sources.provider") != null) {
+ cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+ } else if (table.isView) {
// if the unresolved relation is from hive view
// parse the text into logic node.
HiveQl.createPlanForView(table, alias)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index c439b9ebfe..cdff82e3d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.sources.CreateTableUsing
import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}
import scala.collection.JavaConversions._
@@ -208,6 +209,16 @@ private[hive] trait HiveStrategies {
}
}
+ object HiveDDLStrategy extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
+ ExecutedCommand(
+ CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil
+
+ case _ => Nil
+ }
+ }
+
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case describe: DescribeCommand =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 1358a0eccb..31c7ce9639 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -395,6 +395,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
clearCache()
loadedTables.clear()
+ catalog.cachedDataSourceTables.invalidateAll()
catalog.client.getAllTables("default").foreach { t =>
logDebug(s"Deleting table $t")
val table = catalog.client.getTable("default", t)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 6b733a280e..e70cdeaad4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
@@ -52,6 +53,12 @@ case class DropTable(
override def run(sqlContext: SQLContext) = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
+ try {
+ hiveContext.tryUncacheQuery(hiveContext.table(tableName))
+ } catch {
+ case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
+ }
+ hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(Seq(tableName))
Seq.empty[Row]
@@ -85,3 +92,17 @@ case class AddFile(path: String) extends RunnableCommand {
Seq.empty[Row]
}
}
+
+case class CreateMetastoreDataSource(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String]) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext) = {
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
+ hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options)
+
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/hive/src/test/resources/sample.json b/sql/hive/src/test/resources/sample.json
new file mode 100644
index 0000000000..a2c2ffd5e0
--- /dev/null
+++ b/sql/hive/src/test/resources/sample.json
@@ -0,0 +1,2 @@
+{"a" : "2" ,"b" : "blah", "c_!@(3)":1}
+{"<d>" : {"d!" : [4, 5], "=" : [{"Dd2": null}, {"Dd2" : true}]}}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
new file mode 100644
index 0000000000..ec9ebb4a77
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.sql._
+import org.apache.spark.util.Utils
+
+/* Implicits */
+import org.apache.spark.sql.hive.test.TestHive._
+
+/**
+ * Tests for persisting tables created though the data sources API into the metastore.
+ */
+class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
+ override def afterEach(): Unit = {
+ reset()
+ }
+
+ val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
+
+ test ("persistent JSON table") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ jsonFile(filePath).collect().toSeq)
+ }
+
+ test ("persistent JSON table with a user specified schema") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable (
+ |a string,
+ |b String,
+ |`c_!@(3)` int,
+ |`<d>` Struct<`d!`:array<int>, `=`:array<struct<Dd2: boolean>>>)
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ jsonFile(filePath).registerTempTable("expectedJsonTable")
+
+ checkAnswer(
+ sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM jsonTable"),
+ sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
+ }
+
+ test ("persistent JSON table with a user specified schema with a subset of fields") {
+ // This works because JSON objects are self-describing and JSONRelation can get needed
+ // field values based on field names.
+ sql(
+ s"""
+ |CREATE TABLE jsonTable (`<d>` Struct<`=`:array<struct<Dd2: boolean>>>, b String)
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ val innerStruct = StructType(
+ StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil))) :: Nil)
+ val expectedSchema = StructType(
+ StructField("<d>", innerStruct, true) ::
+ StructField("b", StringType, true) :: Nil)
+
+ assert(expectedSchema == table("jsonTable").schema)
+
+ jsonFile(filePath).registerTempTable("expectedJsonTable")
+
+ checkAnswer(
+ sql("SELECT b, `<d>`.`=` FROM jsonTable"),
+ sql("SELECT b, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
+ }
+
+ test("resolve shortened provider names") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ jsonFile(filePath).collect().toSeq)
+ }
+
+ test("drop table") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ jsonFile(filePath).collect().toSeq)
+
+ sql("DROP TABLE jsonTable")
+
+ intercept[Exception] {
+ sql("SELECT * FROM jsonTable").collect()
+ }
+ }
+
+ test("check change without refresh") {
+ val tempDir = File.createTempFile("sparksql", "json")
+ tempDir.delete()
+ sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${tempDir.getCanonicalPath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a", "b") :: Nil)
+
+ FileUtils.deleteDirectory(tempDir)
+ sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+ // Schema is cached so the new column does not show. The updated values in existing columns
+ // will show.
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a1", "b1") :: Nil)
+
+ refreshTable("jsonTable")
+
+ // Check that the refresh worked
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a1", "b1", "c1") :: Nil)
+ FileUtils.deleteDirectory(tempDir)
+ }
+
+ test("drop, change, recreate") {
+ val tempDir = File.createTempFile("sparksql", "json")
+ tempDir.delete()
+ sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${tempDir.getCanonicalPath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a", "b") :: Nil)
+
+ FileUtils.deleteDirectory(tempDir)
+ sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+ sql("DROP TABLE jsonTable")
+
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${tempDir.getCanonicalPath}'
+ |)
+ """.stripMargin)
+
+ // New table should reflect new schema.
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a", "b", "c") :: Nil)
+ FileUtils.deleteDirectory(tempDir)
+ }
+
+ test("invalidate cache and reload") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable (`c_!@(3)` int)
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ jsonFile(filePath).registerTempTable("expectedJsonTable")
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
+
+ // Discard the cached relation.
+ invalidateTable("jsonTable")
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
+
+ invalidateTable("jsonTable")
+ val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)
+
+ assert(expectedSchema == table("jsonTable").schema)
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 700a45edb1..4decd15485 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -623,7 +623,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assertResult(
Array(
- Array("# Registered as a temporary table", null, null),
Array("a", "IntegerType", null),
Array("b", "StringType", null))
) {