aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-07-31 18:18:53 -0700
committerYin Huai <yhuai@databricks.com>2016-07-31 18:18:53 -0700
commit301fb0d7236eb55d53c9cd60804a2d755b4ad3b2 (patch)
tree51383907d4c442831fac687f61711ba198949561 /sql/core
parent064d91ff7342002414d3274694a8e2e37f154986 (diff)
downloadspark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.tar.gz
spark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.tar.bz2
spark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.zip
[SPARK-16731][SQL] use StructType in CatalogTable and remove CatalogColumn
## What changes were proposed in this pull request? `StructField` has very similar semantic with `CatalogColumn`, except that `CatalogColumn` use string to express data type. I think it's reasonable to use `StructType` as the `CatalogTable.schema` and remove `CatalogColumn`. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14363 from cloud-fan/column.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala25
7 files changed, 44 insertions, 75 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 5e1ad9b885..22b1e07219 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation,
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, StructType}
/**
* Concrete parser for Spark SQL statements.
@@ -928,13 +928,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
}
val comment = Option(ctx.STRING).map(string)
- val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
- val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+ val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
+ val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val selectQuery = Option(ctx.query).map(plan)
// Ensuring whether no duplicate name is used in table definition
- val colNames = cols.map(_.name)
+ val colNames = dataCols.map(_.name)
if (colNames.length != colNames.distinct.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
@@ -952,7 +952,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
- val schema = cols ++ partitionCols
+ val schema = StructType(dataCols ++ partitionCols)
// Storage format
val defaultStorage: CatalogStorageFormat = {
@@ -1297,23 +1297,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Create a sequence of [[CatalogColumn]]s from a column list
- */
- private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) {
- ctx.colType.asScala.map { col =>
- CatalogColumn(
- col.identifier.getText.toLowerCase,
- // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
- // just convert the whole type string to lower case, otherwise the struct field names
- // will no longer be case sensitive. Instead, we rely on our parser to get the proper
- // case before passing it to Hive.
- typedVisit[DataType](col.dataType).catalogString,
- nullable = true,
- Option(col.STRING).map(string))
- }
- }
-
- /**
* Create a [[ScriptInputOutputSchema]].
*/
override protected def withScriptIOSchema(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index fa3967c676..93eb386ade 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -395,7 +395,7 @@ object CreateDataSourceTableUtils extends Logging {
CatalogTable(
identifier = tableIdent,
tableType = tableType,
- schema = Nil,
+ schema = new StructType,
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
@@ -424,9 +424,7 @@ object CreateDataSourceTableUtils extends Logging {
compressed = false,
properties = options
),
- schema = relation.schema.map { f =>
- CatalogColumn(f.name, f.dataType.catalogString)
- },
+ schema = relation.schema,
properties = tableProperties.toMap,
viewText = None)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7e99593fbc..f0e49e65c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -518,7 +518,7 @@ object DDLUtils {
}
def isTablePartitioned(table: CatalogTable): Boolean = {
- table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
+ table.partitionColumnNames.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
}
// A persisted data source table always store its schema in the catalog.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f85373c751..e6fe9a73a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -439,10 +439,10 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer)
}
} else {
- if (table.partitionColumns.nonEmpty) {
+ if (table.partitionColumnNames.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
- describeSchema(table.partitionColumns, buffer)
+ describeSchema(table.partitionSchema, buffer)
}
}
}
@@ -521,12 +521,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}
- private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
- schema.foreach { column =>
- append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
- }
- }
-
private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull)
@@ -701,7 +695,7 @@ case class ShowPartitionsCommand(
* thrown if the partitioning spec is invalid.
*/
if (spec.isDefined) {
- val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
+ val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains)
if (badColumns.nonEmpty) {
val badCols = badColumns.mkString("[", ", ", "]")
throw new AnalysisException(
@@ -799,14 +793,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
.foreach(builder.append)
}
- private def columnToDDLFragment(column: CatalogColumn): String = {
- val comment = column.comment.map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'")
- s"${quoteIdentifier(column.name)} ${column.dataType}${comment.getOrElse("")}"
+ private def columnToDDLFragment(column: StructField): String = {
+ val comment = column.getComment().map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'")
+ s"${quoteIdentifier(column.name)} ${column.dataType.catalogString}${comment.getOrElse("")}"
}
private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: StringBuilder): Unit = {
- if (metadata.partitionColumns.nonEmpty) {
- val partCols = metadata.partitionColumns.map(columnToDDLFragment)
+ if (metadata.partitionColumnNames.nonEmpty) {
+ val partCols = metadata.partitionSchema.map(columnToDDLFragment)
builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 901a9b9cf5..e397cfa058 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -21,10 +21,11 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.types.StructType
/**
@@ -161,18 +162,17 @@ case class CreateViewCommand(
* SQL based on the analyzed plan, and also creates the proper schema for the view.
*/
private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
- val viewSQL: String = {
- val logicalPlan = if (userSpecifiedColumns.isEmpty) {
- analyzedPlan
- } else {
- val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (attr, (colName, _)) => Alias(attr, colName)()
- }
- sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
+ val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
+ analyzedPlan
+ } else {
+ val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+ case (attr, (colName, _)) => Alias(attr, colName)()
}
- new SQLBuilder(logicalPlan).toSQL
+ sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
+ val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
+
// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
@@ -184,14 +184,11 @@ case class CreateViewCommand(
}
val viewSchema = if (userSpecifiedColumns.isEmpty) {
- analyzedPlan.output.map { a =>
- CatalogColumn(a.name, a.dataType.catalogString)
- }
+ aliasedPlan.schema
} else {
- analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (a, (name, comment)) =>
- CatalogColumn(name, a.dataType.catalogString, comment = comment)
- }
+ StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map {
+ case (field, (_, comment)) => comment.map(field.withComment).getOrElse(field)
+ })
}
CatalogTable(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 5393b76161..f8f78723b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -157,8 +157,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
val columns = tableMetadata.schema.map { c =>
new Column(
name = c.name,
- description = c.comment.orNull,
- dataType = c.dataType,
+ description = c.getComment().orNull,
+ dataType = c.dataType.catalogString,
nullable = c.nullable,
isPartition = partitionColumnNames.contains(c.name),
isBucket = bucketColumnNames.contains(c.name))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 7bd1b0bcdb..564fc73ee7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -89,11 +89,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
- schema = Seq(
- CatalogColumn("col1", "int"),
- CatalogColumn("col2", "string"),
- CatalogColumn("a", "int"),
- CatalogColumn("b", "int")),
+ schema = new StructType()
+ .add("col1", "int")
+ .add("col2", "string")
+ .add("a", "int")
+ .add("b", "int"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L)
}
@@ -258,9 +258,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
userSpecifiedPartitionCols: Option[String],
expectedSchema: StructType,
expectedPartitionCols: Seq[String]): Unit = {
- var tableSchema = StructType(Nil)
- var partCols = Seq.empty[String]
-
val tabName = "tab1"
withTable(tabName) {
val partitionClause =
@@ -277,11 +274,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
""".stripMargin)
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
- tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
- partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+ assert(expectedSchema ==
+ DDLUtils.getSchemaFromTableProperties(tableMetadata))
+ assert(expectedPartitionCols ==
+ DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata))
}
- assert(tableSchema == expectedSchema)
- assert(partCols == expectedPartitionCols)
}
test("Create partitioned data source table without user specified schema") {
@@ -601,7 +598,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
- assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", "int")))
+ assert(table.schema == new StructType().add("a", "int").add("b", "int"))
assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
}
}