diff options
Diffstat (limited to 'sql/core/src')
7 files changed, 44 insertions, 75 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 5e1ad9b885..22b1e07219 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{DataType, StructType} /** * Concrete parser for Spark SQL statements. @@ -928,13 +928,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) } val comment = Option(ctx.STRING).map(string) - val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns) - val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns) + val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil) + val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil) val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) // Ensuring whether no duplicate name is used in table definition - val colNames = cols.map(_.name) + val colNames = dataCols.map(_.name) if (colNames.length != colNames.distinct.length) { val duplicateColumns = colNames.groupBy(identity).collect { case (x, ys) if ys.length > 1 => "\"" + x + "\"" @@ -952,7 +952,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // Note: Hive requires partition columns to be distinct from the schema, so we need // to include the partition columns here explicitly - val schema = cols ++ partitionCols + val schema = StructType(dataCols ++ partitionCols) // Storage format val defaultStorage: CatalogStorageFormat = { @@ -1297,23 +1297,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a sequence of [[CatalogColumn]]s from a column list - */ - private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) { - ctx.colType.asScala.map { col => - CatalogColumn( - col.identifier.getText.toLowerCase, - // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't - // just convert the whole type string to lower case, otherwise the struct field names - // will no longer be case sensitive. Instead, we rely on our parser to get the proper - // case before passing it to Hive. - typedVisit[DataType](col.dataType).catalogString, - nullable = true, - Option(col.STRING).map(string)) - } - } - - /** * Create a [[ScriptInputOutputSchema]]. */ override protected def withScriptIOSchema( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index fa3967c676..93eb386ade 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -395,7 +395,7 @@ object CreateDataSourceTableUtils extends Logging { CatalogTable( identifier = tableIdent, tableType = tableType, - schema = Nil, + schema = new StructType, storage = CatalogStorageFormat( locationUri = None, inputFormat = None, @@ -424,9 +424,7 @@ object CreateDataSourceTableUtils extends Logging { compressed = false, properties = options ), - schema = relation.schema.map { f => - CatalogColumn(f.name, f.dataType.catalogString) - }, + schema = relation.schema, properties = tableProperties.toMap, viewText = None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7e99593fbc..f0e49e65c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -518,7 +518,7 @@ object DDLUtils { } def isTablePartitioned(table: CatalogTable): Boolean = { - table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) + table.partitionColumnNames.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) } // A persisted data source table always store its schema in the catalog. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index f85373c751..e6fe9a73a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -439,10 +439,10 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer) } } else { - if (table.partitionColumns.nonEmpty) { + if (table.partitionColumnNames.nonEmpty) { append(buffer, "# Partition Information", "", "") append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) - describeSchema(table.partitionColumns, buffer) + describeSchema(table.partitionSchema, buffer) } } } @@ -521,12 +521,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } } - private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = { - schema.foreach { column => - append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull) - } - } - private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = { schema.foreach { column => append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull) @@ -701,7 +695,7 @@ case class ShowPartitionsCommand( * thrown if the partitioning spec is invalid. */ if (spec.isDefined) { - val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains) + val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains) if (badColumns.nonEmpty) { val badCols = badColumns.mkString("[", ", ", "]") throw new AnalysisException( @@ -799,14 +793,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman .foreach(builder.append) } - private def columnToDDLFragment(column: CatalogColumn): String = { - val comment = column.comment.map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'") - s"${quoteIdentifier(column.name)} ${column.dataType}${comment.getOrElse("")}" + private def columnToDDLFragment(column: StructField): String = { + val comment = column.getComment().map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'") + s"${quoteIdentifier(column.name)} ${column.dataType.catalogString}${comment.getOrElse("")}" } private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: StringBuilder): Unit = { - if (metadata.partitionColumns.nonEmpty) { - val partCols = metadata.partitionColumns.map(columnToDDLFragment) + if (metadata.partitionColumnNames.nonEmpty) { + val partCols = metadata.partitionSchema.map(columnToDDLFragment) builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 901a9b9cf5..e397cfa058 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -21,10 +21,11 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.types.StructType /** @@ -161,18 +162,17 @@ case class CreateViewCommand( * SQL based on the analyzed plan, and also creates the proper schema for the view. */ private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = { - val viewSQL: String = { - val logicalPlan = if (userSpecifiedColumns.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { - case (attr, (colName, _)) => Alias(attr, colName)() - } - sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed + val aliasedPlan = if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, _)) => Alias(attr, colName)() } - new SQLBuilder(logicalPlan).toSQL + sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed } + val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL + // Validate the view SQL - make sure we can parse it and analyze it. // If we cannot analyze the generated query, there is probably a bug in SQL generation. try { @@ -184,14 +184,11 @@ case class CreateViewCommand( } val viewSchema = if (userSpecifiedColumns.isEmpty) { - analyzedPlan.output.map { a => - CatalogColumn(a.name, a.dataType.catalogString) - } + aliasedPlan.schema } else { - analyzedPlan.output.zip(userSpecifiedColumns).map { - case (a, (name, comment)) => - CatalogColumn(name, a.dataType.catalogString, comment = comment) - } + StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map { + case (field, (_, comment)) => comment.map(field.withComment).getOrElse(field) + }) } CatalogTable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 5393b76161..f8f78723b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -157,8 +157,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { val columns = tableMetadata.schema.map { c => new Column( name = c.name, - description = c.comment.orNull, - dataType = c.dataType, + description = c.getComment().orNull, + dataType = c.dataType.catalogString, nullable = c.nullable, isPartition = partitionColumnNames.contains(c.name), isBucket = bucketColumnNames.contains(c.name)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 7bd1b0bcdb..564fc73ee7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat} -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.parser.ParseException @@ -89,11 +89,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { identifier = name, tableType = CatalogTableType.EXTERNAL, storage = storage, - schema = Seq( - CatalogColumn("col1", "int"), - CatalogColumn("col2", "string"), - CatalogColumn("a", "int"), - CatalogColumn("b", "int")), + schema = new StructType() + .add("col1", "int") + .add("col2", "string") + .add("a", "int") + .add("b", "int"), partitionColumnNames = Seq("a", "b"), createTime = 0L) } @@ -258,9 +258,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { userSpecifiedPartitionCols: Option[String], expectedSchema: StructType, expectedPartitionCols: Seq[String]): Unit = { - var tableSchema = StructType(Nil) - var partCols = Seq.empty[String] - val tabName = "tab1" withTable(tabName) { val partitionClause = @@ -277,11 +274,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { """.stripMargin) val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) - tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) - partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + assert(expectedSchema == + DDLUtils.getSchemaFromTableProperties(tableMetadata)) + assert(expectedPartitionCols == + DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)) } - assert(tableSchema == expectedSchema) - assert(partCols == expectedPartitionCols) } test("Create partitioned data source table without user specified schema") { @@ -601,7 +598,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("CREATE TABLE tbl(a INT, b INT) USING parquet") val table = catalog.getTableMetadata(TableIdentifier("tbl")) assert(table.tableType == CatalogTableType.MANAGED) - assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", "int"))) + assert(table.schema == new StructType().add("a", "int").add("b", "int")) assert(table.properties(DATASOURCE_PROVIDER) == "parquet") } } |