aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala84
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala210
6 files changed, 286 insertions, 74 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 5e3cd9f895..fa3967c676 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.HiveSerDe
-import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types._
/**
@@ -52,7 +52,7 @@ case class CreateDataSourceTableCommand(
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
- partitionColumns: Array[String],
+ userSpecifiedPartitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
ignoreIfExists: Boolean,
managedIfNoPath: Boolean)
@@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand(
}
// Create the relation to validate the arguments before writing the metadata to the metastore.
- DataSource(
- sparkSession = sparkSession,
- userSpecifiedSchema = userSpecifiedSchema,
- className = provider,
- bucketSpec = None,
- options = optionsWithPath).resolveRelation(checkPathExist = false)
+ val dataSource: BaseRelation =
+ DataSource(
+ sparkSession = sparkSession,
+ userSpecifiedSchema = userSpecifiedSchema,
+ className = provider,
+ bucketSpec = None,
+ options = optionsWithPath).resolveRelation(checkPathExist = false)
+
+ val partitionColumns = if (userSpecifiedSchema.nonEmpty) {
+ userSpecifiedPartitionColumns
+ } else {
+ val res = dataSource match {
+ case r: HadoopFsRelation => r.partitionSchema.fieldNames
+ case _ => Array.empty[String]
+ }
+ if (userSpecifiedPartitionColumns.length > 0) {
+ // The table does not have a specified schema, which means that the schema will be inferred
+ // when we load the table. So, we are not expecting partition columns and we will discover
+ // partitions when we load the table. However, if there are specified partition columns,
+ // we simply ignore them and provide a warning message.
+ logWarning(
+ s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " +
+ s"ignored. The schema and partition columns of table $tableIdent are inferred. " +
+ s"Schema: ${dataSource.schema.simpleString}; " +
+ s"Partition columns: ${res.mkString("(", ", ", ")")}")
+ }
+ res
+ }
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
- userSpecifiedSchema = userSpecifiedSchema,
+ schema = dataSource.schema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
@@ -213,7 +235,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
existingSchema = Some(l.schema)
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
- existingSchema = DDLUtils.getSchemaFromTableProperties(s.metadata)
+ existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata))
case o =>
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
@@ -256,7 +278,7 @@ case class CreateDataSourceTableAsSelectCommand(
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
- userSpecifiedSchema = Some(result.schema),
+ schema = result.schema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
@@ -306,7 +328,7 @@ object CreateDataSourceTableUtils extends Logging {
def createDataSourceTable(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
- userSpecifiedSchema: Option[StructType],
+ schema: StructType,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
provider: String,
@@ -315,28 +337,26 @@ object CreateDataSourceTableUtils extends Logging {
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)
- // Saves optional user specified schema. Serialized JSON schema string may be too long to be
- // stored into a single metastore SerDe property. In this case, we split the JSON string and
- // store each part as a separate SerDe property.
- userSpecifiedSchema.foreach { schema =>
- val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
- val schemaJsonString = schema.json
- // Split the JSON string.
- val parts = schemaJsonString.grouped(threshold).toSeq
- tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
- parts.zipWithIndex.foreach { case (part, index) =>
- tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
- }
+ // Serialized JSON schema string may be too long to be stored into a single metastore table
+ // property. In this case, we split the JSON string and store each part as a separate table
+ // property.
+ val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
+ val schemaJsonString = schema.json
+ // Split the JSON string.
+ val parts = schemaJsonString.grouped(threshold).toSeq
+ tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
+ parts.zipWithIndex.foreach { case (part, index) =>
+ tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}
- if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
+ if (partitionColumns.length > 0) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}
- if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
+ if (bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
@@ -353,16 +373,6 @@ object CreateDataSourceTableUtils extends Logging {
}
}
- if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
- // The table does not have a specified schema, which means that the schema will be inferred
- // when we load the table. So, we are not expecting partition columns and we will discover
- // partitions when we load the table. However, if there are specified partition columns,
- // we simply ignore them and provide a warning message.
- logWarning(
- s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
- s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
- }
-
val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
CatalogTableType.EXTERNAL
@@ -375,7 +385,7 @@ object CreateDataSourceTableUtils extends Logging {
val dataSource =
DataSource(
sparkSession,
- userSpecifiedSchema = userSpecifiedSchema,
+ userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = provider,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 03f81c46a8..7e99593fbc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -521,31 +521,29 @@ object DDLUtils {
table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
}
- // A persisted data source table may not store its schema in the catalog. In this case, its schema
- // will be inferred at runtime when the table is referenced.
- def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
+ // A persisted data source table always store its schema in the catalog.
+ def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
require(isDatasourceTable(metadata))
+ val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted."
val props = metadata.properties
- if (props.isDefinedAt(DATASOURCE_SCHEMA)) {
+ props.get(DATASOURCE_SCHEMA).map { schema =>
// Originally, we used spark.sql.sources.schema to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using spark.sql.sources.schema any more, we need to still support.
- props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType])
- } else {
- metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
+ DataType.fromJson(schema).asInstanceOf[StructType]
+ } getOrElse {
+ props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
val parts = (0 until numParts.toInt).map { index =>
val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
if (part == null) {
- throw new AnalysisException(
- "Could not read schema from the metastore because it is corrupted " +
- s"(missing part $index of the schema, $numParts parts are expected).")
+ throw new AnalysisException(msgSchemaCorrupted +
+ s" (missing part $index of the schema, $numParts parts are expected).")
}
-
part
}
// Stick all parts back to a single schema string.
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
- }
+ } getOrElse(throw new AnalysisException(msgSchemaCorrupted))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 82633803fa..f85373c751 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -416,15 +416,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
} else {
val metadata = catalog.getTableMetadata(table)
- if (DDLUtils.isDatasourceTable(metadata)) {
- DDLUtils.getSchemaFromTableProperties(metadata) match {
- case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, result)
- case None => describeSchema(catalog.lookupRelation(table).schema, result)
- }
- } else {
- describeSchema(metadata.schema, result)
- }
-
+ describeSchema(metadata, result)
if (isExtended) {
describeExtended(metadata, result)
} else if (isFormatted) {
@@ -439,12 +431,12 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
if (DDLUtils.isDatasourceTable(table)) {
- val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
val partColNames = DDLUtils.getPartitionColumnsFromTableProperties(table)
- for (schema <- userSpecifiedSchema if partColNames.nonEmpty) {
+ if (partColNames.nonEmpty) {
+ val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
- describeSchema(StructType(partColNames.map(schema(_))), buffer)
+ describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer)
}
} else {
if (table.partitionColumns.nonEmpty) {
@@ -518,6 +510,17 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}
+ private def describeSchema(
+ tableDesc: CatalogTable,
+ buffer: ArrayBuffer[Row]): Unit = {
+ if (DDLUtils.isDatasourceTable(tableDesc)) {
+ val schema = DDLUtils.getSchemaFromTableProperties(tableDesc)
+ describeSchema(schema, buffer)
+ } else {
+ describeSchema(tableDesc.schema, buffer)
+ }
+ }
+
private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
@@ -876,12 +879,9 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
private def showDataSourceTableDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
- DDLUtils.getSchemaFromTableProperties(metadata).foreach { schema =>
- val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
- builder ++= columns.mkString("(", ", ", ")")
- }
-
- builder ++= "\n"
+ val schema = DDLUtils.getSchemaFromTableProperties(metadata)
+ val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
+ builder ++= columns.mkString("(", ", ", ")\n")
}
private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8ffdc507db..ca03b26e85 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -205,7 +205,7 @@ private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[Logi
*/
private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
- val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
+ val schema = DDLUtils.getSchemaFromTableProperties(table)
// We only need names at here since userSpecifiedSchema we loaded from the metastore
// contains partition columns. We can always get datatypes of partitioning columns
@@ -218,7 +218,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[
val dataSource =
DataSource(
sparkSession,
- userSpecifiedSchema = userSpecifiedSchema,
+ userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 05dfb8cb22..5393b76161 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -352,13 +352,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
/**
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
- * is refreshed.
+ * is refreshed. For data source tables, the schema will not be inferred and refreshed.
*
* @group cachemgmt
* @since 2.0.0
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ // Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
+ // Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent)
// If this table is cached as an InMemoryRelation, drop the original
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index a354594a6d..7bd1b0bcdb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config._
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
@@ -252,6 +252,208 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
+ private def checkSchemaInCreatedDataSourceTable(
+ path: File,
+ userSpecifiedSchema: Option[String],
+ userSpecifiedPartitionCols: Option[String],
+ expectedSchema: StructType,
+ expectedPartitionCols: Seq[String]): Unit = {
+ var tableSchema = StructType(Nil)
+ var partCols = Seq.empty[String]
+
+ val tabName = "tab1"
+ withTable(tabName) {
+ val partitionClause =
+ userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("")
+ val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
+ sql(
+ s"""
+ |CREATE TABLE $tabName $schemaClause
+ |USING parquet
+ |OPTIONS (
+ | path '$path'
+ |)
+ |$partitionClause
+ """.stripMargin)
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
+
+ tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
+ partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+ }
+ assert(tableSchema == expectedSchema)
+ assert(partCols == expectedPartitionCols)
+ }
+
+ test("Create partitioned data source table without user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+
+ // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
+ withTempPath { pathToPartitionedTable =>
+ df.write.format("parquet").partitionBy("num")
+ .save(pathToPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToPartitionedTable,
+ userSpecifiedSchema = None,
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("str", StringType).add("num", IntegerType),
+ expectedPartitionCols = Seq("num"))
+ }
+ }
+ }
+
+ test("Create partitioned data source table with user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+
+ // Case 1: with partitioning columns but no schema: Option("num")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("num"), None).foreach { partitionCols =>
+ withTempPath { pathToPartitionedTable =>
+ df.write.format("parquet").partitionBy("num")
+ .save(pathToPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToPartitionedTable,
+ userSpecifiedSchema = Option("num int, str string"),
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
+ }
+ }
+ }
+
+ test("Create non-partitioned data source table without user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+
+ // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("inexistentColumns"), None).foreach { partitionCols =>
+ withTempPath { pathToNonPartitionedTable =>
+ df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToNonPartitionedTable,
+ userSpecifiedSchema = None,
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedPartitionCols = Seq.empty[String])
+ }
+ }
+ }
+
+ test("Create non-partitioned data source table with user specified schema") {
+ import testImplicits._
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+
+ // Case 1: with partitioning columns but no schema: Option("inexistentColumns")
+ // Case 2: without schema and partitioning columns: None
+ Seq(Option("num"), None).foreach { partitionCols =>
+ withTempPath { pathToNonPartitionedTable =>
+ df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath)
+ checkSchemaInCreatedDataSourceTable(
+ pathToNonPartitionedTable,
+ userSpecifiedSchema = Option("num int, str string"),
+ userSpecifiedPartitionCols = partitionCols,
+ expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
+ }
+ }
+ }
+
+ test("Describe Table with Corrupted Schema") {
+ import testImplicits._
+
+ val tabName = "tab1"
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2")
+ df.write.format("json").save(path)
+
+ withTable(tabName) {
+ sql(
+ s"""
+ |CREATE TABLE $tabName
+ |USING json
+ |OPTIONS (
+ | path '$path'
+ |)
+ """.stripMargin)
+
+ val catalog = spark.sessionState.catalog
+ val table = catalog.getTableMetadata(TableIdentifier(tabName))
+ val newProperties = table.properties.filterKeys(key =>
+ key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS)
+ val newTable = table.copy(properties = newProperties)
+ catalog.alterTable(newTable)
+
+ val e = intercept[AnalysisException] {
+ sql(s"DESC $tabName")
+ }.getMessage
+ assert(e.contains(s"Could not read schema from the metastore because it is corrupted"))
+ }
+ }
+ }
+
+ test("Refresh table after changing the data source table partitioning") {
+ import testImplicits._
+
+ val tabName = "tab1"
+ val catalog = spark.sessionState.catalog
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString, i, i))
+ .toDF("col1", "col2", "col3", "col4")
+ df.write.format("json").partitionBy("col1", "col3").save(path)
+ val schema = new StructType()
+ .add("col2", StringType).add("col4", LongType)
+ .add("col1", IntegerType).add("col3", IntegerType)
+ val partitionCols = Seq("col1", "col3")
+
+ withTable(tabName) {
+ spark.sql(
+ s"""
+ |CREATE TABLE $tabName
+ |USING json
+ |OPTIONS (
+ | path '$path'
+ |)
+ """.stripMargin)
+ val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
+ val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
+ assert(tableSchema == schema)
+ val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+ assert(partCols == partitionCols)
+
+ // Change the schema
+ val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
+ .toDF("newCol1", "newCol2")
+ newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path)
+
+ // No change on the schema
+ val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
+ val tableSchemaBeforeRefresh =
+ DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh)
+ assert(tableSchemaBeforeRefresh == schema)
+ val partColsBeforeRefresh =
+ DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh)
+ assert(partColsBeforeRefresh == partitionCols)
+
+ // Refresh does not affect the schema
+ spark.catalog.refreshTable(tabName)
+
+ val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
+ val tableSchemaAfterRefresh =
+ DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh)
+ assert(tableSchemaAfterRefresh == schema)
+ val partColsAfterRefresh =
+ DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh)
+ assert(partColsAfterRefresh == partitionCols)
+ }
+ }
+ }
+
test("desc table for parquet data source table using in-memory catalog") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
val tabName = "tab1"
@@ -413,7 +615,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
- Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+ new StructType().add("a", IntegerType).add("b", IntegerType))
assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
Seq("a"))
}
@@ -429,7 +631,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
- Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+ new StructType().add("a", IntegerType).add("b", IntegerType))
assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
Some(BucketSpec(5, Seq("a"), Seq("b"))))
}