aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-07-28 17:29:26 +0800
committerWenchen Fan <wenchen@databricks.com>2016-07-28 17:29:26 +0800
commit762366fd8722f2b3fa98b8da9338b757a1821708 (patch)
treea9dfaf454f236cd6b6535b736bd4f0913fdaee98 /sql/core/src/test/scala
parent5c2ae79bfcf448d8dc9217efafa1409997c739de (diff)
downloadspark-762366fd8722f2b3fa98b8da9338b757a1821708.tar.gz
spark-762366fd8722f2b3fa98b8da9338b757a1821708.tar.bz2
spark-762366fd8722f2b3fa98b8da9338b757a1821708.zip
[SPARK-16552][SQL] Store the Inferred Schemas into External Catalog Tables when Creating Tables
#### What changes were proposed in this pull request? Currently, in Spark SQL, the initial creation of schema can be classified into two groups. It is applicable to both Hive tables and Data Source tables: **Group A. Users specify the schema.** _Case 1 CREATE TABLE AS SELECT_: the schema is determined by the result schema of the SELECT clause. For example, ```SQL CREATE TABLE tab STORED AS TEXTFILE AS SELECT * from input ``` _Case 2 CREATE TABLE_: users explicitly specify the schema. For example, ```SQL CREATE TABLE jsonTable (_1 string, _2 string) USING org.apache.spark.sql.json ``` **Group B. Spark SQL infers the schema at runtime.** _Case 3 CREATE TABLE_. Users do not specify the schema but the path to the file location. For example, ```SQL CREATE TABLE jsonTable USING org.apache.spark.sql.json OPTIONS (path '${tempDir.getCanonicalPath}') ``` Before this PR, Spark SQL does not store the inferred schema in the external catalog for the cases in Group B. When users refreshing the metadata cache, accessing the table at the first time after (re-)starting Spark, Spark SQL will infer the schema and store the info in the metadata cache for improving the performance of subsequent metadata requests. However, the runtime schema inference could cause undesirable schema changes after each reboot of Spark. This PR is to store the inferred schema in the external catalog when creating the table. When users intend to refresh the schema after possible changes on external files (table location), they issue `REFRESH TABLE`. Spark SQL will infer the schema again based on the previously specified table location and update/refresh the schema in the external catalog and metadata cache. In this PR, we do not use the inferred schema to replace the user specified schema for avoiding external behavior changes . Based on the design, user-specified schemas (as described in Group A) can be changed by ALTER TABLE commands, although we do not support them now. #### How was this patch tested? TODO: add more cases to cover the changes. Author: gatorsmile <gatorsmile@gmail.com> Closes #14207 from gatorsmile/userSpecifiedSchema.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala210
1 files changed, 206 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index a354594a6d..7bd1b0bcdb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config._
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
@@ -252,6 +252,208 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
+ private def checkSchemaInCreatedDataSourceTable(
+ path: File,
+ userSpecifiedSchema: Option[String],
+ userSpecifiedPartitionCols: Option[String],
+ expectedSchema: StructType,
+ expectedPartitionCols: Seq[String]): Unit = {
+ var tableSchema = StructType(Nil)
+ var partCols = Seq.empty[String]
+
+ val tabName = "tab1"
+ withTable(tabName) {
+ val partitionClause =
+ userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("")
+ val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
+ sql(
+ s"""
+ |CREATE TABLE $tabName $schemaClause
+ |USING parquet
+ |OPTIONS (
+ | path '$path'
+ |)
+ |$partitionClause
+ """.stripMargin)
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
+
+ tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
+ partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+ }
+ assert(tableSchema == expectedSchema)
+ assert(partCols == expectedPartitionCols)
+ }
+
+ test("Create partitioned data source table without user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+
+ // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
+ withTempPath { pathToPartitionedTable =>
+ df.write.format("parquet").partitionBy("num")
+ .save(pathToPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToPartitionedTable,
+ userSpecifiedSchema = None,
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("str", StringType).add("num", IntegerType),
+ expectedPartitionCols = Seq("num"))
+ }
+ }
+ }
+
+ test("Create partitioned data source table with user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+
+ // Case 1: with partitioning columns but no schema: Option("num")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("num"), None).foreach { partitionCols =>
+ withTempPath { pathToPartitionedTable =>
+ df.write.format("parquet").partitionBy("num")
+ .save(pathToPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToPartitionedTable,
+ userSpecifiedSchema = Option("num int, str string"),
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
+ }
+ }
+ }
+
+ test("Create non-partitioned data source table without user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+
+ // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
+ withTempPath { pathToNonPartitionedTable =>
+ df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToNonPartitionedTable,
+ userSpecifiedSchema = None,
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedPartitionCols = Seq.empty[String])
+ }
+ }
+ }
+
+ test("Create non-partitioned data source table with user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+
+ // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("num"), None).foreach { partitionCols =>
+ withTempPath { pathToNonPartitionedTable =>
+ df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToNonPartitionedTable,
+ userSpecifiedSchema = Option("num int, str string"),
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
+ }
+ }
+ }
+
+ test("Describe Table with Corrupted Schema") {
+ import testImplicits._
+
+ val tabName = "tab1"
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2")
+ df.write.format("json").save(path)
+
+ withTable(tabName) {
+ sql(
+ s"""
+ |CREATE TABLE $tabName
+ |USING json
+ |OPTIONS (
+ | path '$path'
+ |)
+ """.stripMargin)
+
+ val catalog = spark.sessionState.catalog
+ val table = catalog.getTableMetadata(TableIdentifier(tabName))
+ val newProperties = table.properties.filterKeys(key =>
+ key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS)
+ val newTable = table.copy(properties = newProperties)
+ catalog.alterTable(newTable)
+
+ val e = intercept[AnalysisException] {
+ sql(s"DESC $tabName")
+ }.getMessage
+ assert(e.contains(s"Could not read schema from the metastore because it is corrupted"))
+ }
+ }
+ }
+
+ test("Refresh table after changing the data source table partitioning") {
+ import testImplicits._
+
+ val tabName = "tab1"
+ val catalog = spark.sessionState.catalog
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString, i, i))
+ .toDF("col1", "col2", "col3", "col4")
+ df.write.format("json").partitionBy("col1", "col3").save(path)
+ val schema = new StructType()
+ .add("col2", StringType).add("col4", LongType)
+ .add("col1", IntegerType).add("col3", IntegerType)
+ val partitionCols = Seq("col1", "col3")
+
+ withTable(tabName) {
+ spark.sql(
+ s"""
+ |CREATE TABLE $tabName
+ |USING json
+ |OPTIONS (
+ | path '$path'
+ |)
+ """.stripMargin)
+ val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
+ val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
+ assert(tableSchema == schema)
+ val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+ assert(partCols == partitionCols)
+
+ // Change the schema
+ val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
+ .toDF("newCol1", "newCol2")
+ newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path)
+
+ // No change on the schema
+ val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
+ val tableSchemaBeforeRefresh =
+ DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh)
+ assert(tableSchemaBeforeRefresh == schema)
+ val partColsBeforeRefresh =
+ DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh)
+ assert(partColsBeforeRefresh == partitionCols)
+
+ // Refresh does not affect the schema
+ spark.catalog.refreshTable(tabName)
+
+ val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
+ val tableSchemaAfterRefresh =
+ DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh)
+ assert(tableSchemaAfterRefresh == schema)
+ val partColsAfterRefresh =
+ DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh)
+ assert(partColsAfterRefresh == partitionCols)
+ }
+ }
+ }
+
test("desc table for parquet data source table using in-memory catalog") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
val tabName = "tab1"
@@ -413,7 +615,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
- Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+ new StructType().add("a", IntegerType).add("b", IntegerType))
assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
Seq("a"))
}
@@ -429,7 +631,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
- Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+ new StructType().add("a", IntegerType).add("b", IntegerType))
assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
Some(BucketSpec(5, Seq("a"), Seq("b"))))
}