aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-08-26 11:13:38 -0700
committerYin Huai <yhuai@databricks.com>2016-08-26 11:13:38 -0700
commitfd4ba3f626f49d7d616a2a334d45b1c736e1db1c (patch)
treeb1157e6cb57bfe55d1db6776b6f07c2e441fb5e4 /sql
parent18832162357282ec81515b5b2ba93747be3ad18b (diff)
downloadspark-fd4ba3f626f49d7d616a2a334d45b1c736e1db1c.tar.gz
spark-fd4ba3f626f49d7d616a2a334d45b1c736e1db1c.tar.bz2
spark-fd4ba3f626f49d7d616a2a334d45b1c736e1db1c.zip
[SPARK-17192][SQL] Issue Exception when Users Specify the Partitioning Columns without a Given Schema
### What changes were proposed in this pull request? Address the comments by yhuai in the original PR: https://github.com/apache/spark/pull/14207 First, issue an exception instead of logging a warning when users specify the partitioning columns without a given schema. Second, refactor the codes a little. ### How was this patch tested? Fixed the test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #14572 from gatorsmile/followup16552.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala25
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala16
3 files changed, 29 insertions, 29 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 5eb2f0a9ff..f14c63c19f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -72,29 +72,20 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// When we CREATE TABLE without specifying the table schema, we should fail the query if
- // bucketing information is specified, as we can't infer bucketing from data files currently,
- // and we should ignore the partition columns if it's specified, as we will infer it later, at
- // runtime.
+ // bucketing information is specified, as we can't infer bucketing from data files currently.
+ // Since the runtime inferred partition columns could be different from what user specified,
+ // we fail the query if the partitioning information is specified.
case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
if (tableDesc.bucketSpec.isDefined) {
failAnalysis("Cannot specify bucketing information if the table schema is not specified " +
"when creating and will be inferred at runtime")
}
-
- val partitionColumnNames = tableDesc.partitionColumnNames
- if (partitionColumnNames.nonEmpty) {
- // The table does not have a specified schema, which means that the schema will be inferred
- // at runtime. So, we are not expecting partition columns and we will discover partitions
- // at runtime. However, if there are specified partition columns, we simply ignore them and
- // provide a warning message.
- logWarning(
- s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " +
- s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " +
- "be inferred.")
- c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
- } else {
- c
+ if (tableDesc.partitionColumnNames.nonEmpty) {
+ failAnalysis("It is not allowed to specify partition columns when the table schema is " +
+ "not defined. When the table schema is not provided, schema and partition columns " +
+ "will be inferred.")
}
+ c
// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
// config, and do various checks:
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e6ae42258d..b343454b12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -265,7 +265,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("")
val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
val uri = path.toURI
- sql(
+ val sqlCreateTable =
s"""
|CREATE TABLE $tabName $schemaClause
|USING parquet
@@ -273,11 +273,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
| path '$uri'
|)
|$partitionClause
- """.stripMargin)
- val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
+ """.stripMargin
+ if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionCols.nonEmpty) {
+ val e = intercept[AnalysisException](sql(sqlCreateTable)).getMessage
+ assert(e.contains(
+ "not allowed to specify partition columns when the table schema is not defined"))
+ } else {
+ sql(sqlCreateTable)
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
- assert(expectedSchema == tableMetadata.schema)
- assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
+ assert(expectedSchema == tableMetadata.schema)
+ assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
+ }
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2586d11a6c..7f50e38d30 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -622,24 +622,26 @@ object HiveExternalCatalog {
def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
val props = metadata.properties
- props.get(DATASOURCE_SCHEMA).map { schema =>
+ val schema = props.get(DATASOURCE_SCHEMA)
+ if (schema.isDefined) {
// Originally, we used `spark.sql.sources.schema` to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using `spark.sql.sources.schema` any more, we need to still support.
- DataType.fromJson(schema).asInstanceOf[StructType]
- } getOrElse {
- props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
- val parts = (0 until numParts.toInt).map { index =>
+ DataType.fromJson(schema.get).asInstanceOf[StructType]
+ } else {
+ val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS)
+ if (numSchemaParts.isDefined) {
+ val parts = (0 until numSchemaParts.get.toInt).map { index =>
val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
if (part == null) {
throw new AnalysisException(errorMessage +
- s" (missing part $index of the schema, $numParts parts are expected).")
+ s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).")
}
part
}
// Stick all parts back to a single schema string.
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
- } getOrElse {
+ } else {
throw new AnalysisException(errorMessage)
}
}