aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala53
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala30
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala79
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala21
-rw-r--r--sql/hive/src/test/resources/sample.json2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala244
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala1
14 files changed, 461 insertions, 28 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6c575dd727..e7021cc336 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -330,6 +330,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def strategies: Seq[Strategy] =
extraStrategies ++ (
DataSourceStrategy ::
+ DDLStrategy ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 99b6611d3b..d91b1fbc69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
+import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
@@ -310,4 +311,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case _ => Nil
}
}
+
+ object DDLStrategy extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) =>
+ ExecutedCommand(
+ CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil
+
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
+ sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+
+ case _ => Nil
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 0d765c4c92..df8e616151 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -177,7 +177,6 @@ case class DescribeCommand(
override val output: Seq[Attribute]) extends RunnableCommand {
override def run(sqlContext: SQLContext) = {
- Row("# Registered as a temporary table", null, null) +:
- child.output.map(field => Row(field.name, field.dataType.toString, null))
+ child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index fe2c4d8436..f8741e0082 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -92,21 +92,21 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val ddl: Parser[LogicalPlan] = createTable
/**
- * `CREATE TEMPORARY TABLE avroTable
+ * `CREATE [TEMPORARY] TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
- * `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
+ * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...)
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
*/
protected lazy val createTable: Parser[LogicalPlan] =
(
- CREATE ~ TEMPORARY ~ TABLE ~> ident
+ (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
- case tableName ~ columns ~ provider ~ opts =>
+ case temp ~ tableName ~ columns ~ provider ~ opts =>
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
- CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
+ CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts)
}
)
@@ -175,13 +175,12 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
primitiveType
}
-private[sql] case class CreateTableUsing(
- tableName: String,
- userSpecifiedSchema: Option[StructType],
- provider: String,
- options: Map[String, String]) extends RunnableCommand {
-
- def run(sqlContext: SQLContext) = {
+object ResolvedDataSource {
+ def apply(
+ sqlContext: SQLContext,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String]): ResolvedDataSource = {
val loader = Utils.getContextOrSparkClassLoader
val clazz: Class[_] = try loader.loadClass(provider) catch {
case cnf: java.lang.ClassNotFoundException =>
@@ -199,22 +198,44 @@ private[sql] case class CreateTableUsing(
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
case _ =>
- sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.")
+ sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.")
}
}
case None => {
clazz.newInstance match {
- case dataSource: org.apache.spark.sql.sources.RelationProvider =>
+ case dataSource: org.apache.spark.sql.sources.RelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options))
case _ =>
- sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.")
+ sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.")
}
}
}
- sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
+ new ResolvedDataSource(clazz, relation)
+ }
+}
+
+private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
+
+private[sql] case class CreateTableUsing(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ temporary: Boolean,
+ options: Map[String, String]) extends Command
+
+private [sql] case class CreateTempTableUsing(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String]) extends RunnableCommand {
+
+ def run(sqlContext: SQLContext) = {
+ val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
+
+ sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
Seq.empty
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 990f7e0e74..2a7be23e37 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
- * Spark SQL is given a DDL operation with a USING clause specified, this interface is used to
- * pass in the parameters specified by a user.
+ * Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented
+ * RelationProvider), this interface is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
@@ -46,10 +46,10 @@ trait RelationProvider {
/**
* ::DeveloperApi::
- * Implemented by objects that produce relations for a specific kind of data source. When
- * Spark SQL is given a DDL operation with
- * 1. USING clause: to specify the implemented SchemaRelationProvider
- * 2. User defined schema: users can define schema optionally when create table
+ * Implemented by objects that produce relations for a specific kind of data source
+ * with a given schema. When Spark SQL is given a DDL operation with a USING clause specified (
+ * to specify the implemented SchemaRelationProvider) and a user defined schema, this interface
+ * is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
@@ -57,6 +57,11 @@ trait RelationProvider {
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
+ *
+ * The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that
+ * users need to provide a schema when using a SchemaRelationProvider.
+ * A relation provider can inherits both [[RelationProvider]] and [[SchemaRelationProvider]]
+ * if it can support both schema inference and user-specified schemas.
*/
@DeveloperApi
trait SchemaRelationProvider {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 605190f5ae..a1d2468b25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -314,4 +314,34 @@ class TableScanSuite extends DataSourceTest {
sql("SELECT * FROM oneToTenDef"),
(1 to 10).map(Row(_)).toSeq)
}
+
+ test("exceptions") {
+ // Make sure we do throw correct exception when users use a relation provider that
+ // only implements the RelationProvier or the SchemaRelationProvider.
+ val schemaNotAllowed = intercept[Exception] {
+ sql(
+ """
+ |CREATE TEMPORARY TABLE relationProvierWithSchema (i int)
+ |USING org.apache.spark.sql.sources.SimpleScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+ assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))
+
+ val schemaNeeded = intercept[Exception] {
+ sql(
+ """
+ |CREATE TEMPORARY TABLE schemaRelationProvierWithoutSchema
+ |USING org.apache.spark.sql.sources.AllDataTypesScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+ assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 02eac43b21..09ff4cc5ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -115,6 +115,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}
+ def refreshTable(tableName: String): Unit = {
+ // TODO: Database support...
+ catalog.refreshTable("default", tableName)
+ }
+
+ protected[hive] def invalidateTable(tableName: String): Unit = {
+ // TODO: Database support...
+ catalog.invalidateTable("default", tableName)
+ }
+
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
@@ -340,6 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
DataSourceStrategy,
HiveCommandStrategy(self),
+ HiveDDLStrategy,
+ DDLStrategy,
TakeOrdered,
ParquetOperations,
InMemoryScans,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c25288e000..daeabb6c8b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,10 +20,11 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.{List => JList}
+import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
+
import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
+import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
import org.apache.hadoop.hive.ql.metadata.InvalidTableException
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
@@ -39,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.util.Utils
/* Implicit conversions */
@@ -50,8 +52,76 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)
+ // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
+ /** A fully qualified identifier for a table (i.e., database.tableName) */
+ case class QualifiedTableName(database: String, name: String) {
+ def toLowerCase = QualifiedTableName(database.toLowerCase, name.toLowerCase)
+ }
+
+ /** A cache of Spark SQL data source tables that have been accessed. */
+ protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
+ val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
+ override def load(in: QualifiedTableName): LogicalPlan = {
+ logDebug(s"Creating new cached data source for $in")
+ val table = client.getTable(in.database, in.name)
+ val schemaString = table.getProperty("spark.sql.sources.schema")
+ val userSpecifiedSchema =
+ if (schemaString == null) {
+ None
+ } else {
+ Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
+ }
+ // It does not appear that the ql client for the metastore has a way to enumerate all the
+ // SerDe properties directly...
+ val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
+
+ val resolvedRelation =
+ ResolvedDataSource(
+ hive,
+ userSpecifiedSchema,
+ table.getProperty("spark.sql.sources.provider"),
+ options)
+
+ LogicalRelation(resolvedRelation.relation)
+ }
+ }
+
+ CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
+ }
+
+ def refreshTable(databaseName: String, tableName: String): Unit = {
+ cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
+ }
+
+ def invalidateTable(databaseName: String, tableName: String): Unit = {
+ cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
+ }
+
val caseSensitive: Boolean = false
+ def createDataSourceTable(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String]) = {
+ val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
+ val tbl = new Table(dbName, tblName)
+
+ tbl.setProperty("spark.sql.sources.provider", provider)
+ if (userSpecifiedSchema.isDefined) {
+ tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
+ }
+ options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
+
+ tbl.setProperty("EXTERNAL", "TRUE")
+ tbl.setTableType(TableType.EXTERNAL_TABLE)
+
+ // create the table
+ synchronized {
+ client.createTable(tbl, false)
+ }
+ }
+
def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
@@ -72,7 +142,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
val table = client.getTable(databaseName, tblName)
- if (table.isView) {
+
+ if (table.getProperty("spark.sql.sources.provider") != null) {
+ cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+ } else if (table.isView) {
// if the unresolved relation is from hive view
// parse the text into logic node.
HiveQl.createPlanForView(table, alias)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index c439b9ebfe..cdff82e3d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.sources.CreateTableUsing
import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}
import scala.collection.JavaConversions._
@@ -208,6 +209,16 @@ private[hive] trait HiveStrategies {
}
}
+ object HiveDDLStrategy extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
+ ExecutedCommand(
+ CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil
+
+ case _ => Nil
+ }
+ }
+
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case describe: DescribeCommand =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 1358a0eccb..31c7ce9639 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -395,6 +395,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
clearCache()
loadedTables.clear()
+ catalog.cachedDataSourceTables.invalidateAll()
catalog.client.getAllTables("default").foreach { t =>
logDebug(s"Deleting table $t")
val table = catalog.client.getTable("default", t)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 6b733a280e..e70cdeaad4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
@@ -52,6 +53,12 @@ case class DropTable(
override def run(sqlContext: SQLContext) = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
+ try {
+ hiveContext.tryUncacheQuery(hiveContext.table(tableName))
+ } catch {
+ case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
+ }
+ hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(Seq(tableName))
Seq.empty[Row]
@@ -85,3 +92,17 @@ case class AddFile(path: String) extends RunnableCommand {
Seq.empty[Row]
}
}
+
+case class CreateMetastoreDataSource(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String]) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext) = {
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
+ hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options)
+
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/hive/src/test/resources/sample.json b/sql/hive/src/test/resources/sample.json
new file mode 100644
index 0000000000..a2c2ffd5e0
--- /dev/null
+++ b/sql/hive/src/test/resources/sample.json
@@ -0,0 +1,2 @@
+{"a" : "2" ,"b" : "blah", "c_!@(3)":1}
+{"<d>" : {"d!" : [4, 5], "=" : [{"Dd2": null}, {"Dd2" : true}]}}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
new file mode 100644
index 0000000000..ec9ebb4a77
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.sql._
+import org.apache.spark.util.Utils
+
+/* Implicits */
+import org.apache.spark.sql.hive.test.TestHive._
+
+/**
+ * Tests for persisting tables created though the data sources API into the metastore.
+ */
+class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
+ override def afterEach(): Unit = {
+ reset()
+ }
+
+ val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
+
+ test ("persistent JSON table") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ jsonFile(filePath).collect().toSeq)
+ }
+
+ test ("persistent JSON table with a user specified schema") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable (
+ |a string,
+ |b String,
+ |`c_!@(3)` int,
+ |`<d>` Struct<`d!`:array<int>, `=`:array<struct<Dd2: boolean>>>)
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ jsonFile(filePath).registerTempTable("expectedJsonTable")
+
+ checkAnswer(
+ sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM jsonTable"),
+ sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
+ }
+
+ test ("persistent JSON table with a user specified schema with a subset of fields") {
+ // This works because JSON objects are self-describing and JSONRelation can get needed
+ // field values based on field names.
+ sql(
+ s"""
+ |CREATE TABLE jsonTable (`<d>` Struct<`=`:array<struct<Dd2: boolean>>>, b String)
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ val innerStruct = StructType(
+ StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil))) :: Nil)
+ val expectedSchema = StructType(
+ StructField("<d>", innerStruct, true) ::
+ StructField("b", StringType, true) :: Nil)
+
+ assert(expectedSchema == table("jsonTable").schema)
+
+ jsonFile(filePath).registerTempTable("expectedJsonTable")
+
+ checkAnswer(
+ sql("SELECT b, `<d>`.`=` FROM jsonTable"),
+ sql("SELECT b, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
+ }
+
+ test("resolve shortened provider names") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ jsonFile(filePath).collect().toSeq)
+ }
+
+ test("drop table") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ jsonFile(filePath).collect().toSeq)
+
+ sql("DROP TABLE jsonTable")
+
+ intercept[Exception] {
+ sql("SELECT * FROM jsonTable").collect()
+ }
+ }
+
+ test("check change without refresh") {
+ val tempDir = File.createTempFile("sparksql", "json")
+ tempDir.delete()
+ sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${tempDir.getCanonicalPath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a", "b") :: Nil)
+
+ FileUtils.deleteDirectory(tempDir)
+ sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+ // Schema is cached so the new column does not show. The updated values in existing columns
+ // will show.
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a1", "b1") :: Nil)
+
+ refreshTable("jsonTable")
+
+ // Check that the refresh worked
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a1", "b1", "c1") :: Nil)
+ FileUtils.deleteDirectory(tempDir)
+ }
+
+ test("drop, change, recreate") {
+ val tempDir = File.createTempFile("sparksql", "json")
+ tempDir.delete()
+ sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${tempDir.getCanonicalPath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a", "b") :: Nil)
+
+ FileUtils.deleteDirectory(tempDir)
+ sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+ sql("DROP TABLE jsonTable")
+
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING org.apache.spark.sql.json
+ |OPTIONS (
+ | path '${tempDir.getCanonicalPath}'
+ |)
+ """.stripMargin)
+
+ // New table should reflect new schema.
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ ("a", "b", "c") :: Nil)
+ FileUtils.deleteDirectory(tempDir)
+ }
+
+ test("invalidate cache and reload") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable (`c_!@(3)` int)
+ |USING org.apache.spark.sql.json.DefaultSource
+ |OPTIONS (
+ | path '${filePath}'
+ |)
+ """.stripMargin)
+
+ jsonFile(filePath).registerTempTable("expectedJsonTable")
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
+
+ // Discard the cached relation.
+ invalidateTable("jsonTable")
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
+
+ invalidateTable("jsonTable")
+ val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)
+
+ assert(expectedSchema == table("jsonTable").schema)
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 700a45edb1..4decd15485 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -623,7 +623,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assertResult(
Array(
- Array("# Registered as a temporary table", null, null),
Array("a", "IntegerType", null),
Array("b", "StringType", null))
) {