aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-07-31 18:18:53 -0700
committerYin Huai <yhuai@databricks.com>2016-07-31 18:18:53 -0700
commit301fb0d7236eb55d53c9cd60804a2d755b4ad3b2 (patch)
tree51383907d4c442831fac687f61711ba198949561 /sql
parent064d91ff7342002414d3274694a8e2e37f154986 (diff)
downloadspark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.tar.gz
spark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.tar.bz2
spark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.zip
[SPARK-16731][SQL] use StructType in CatalogTable and remove CatalogColumn
## What changes were proposed in this pull request? `StructField` has very similar semantic with `CatalogColumn`, except that `CatalogColumn` use string to express data type. I think it's reasonable to use `StructType` as the `CatalogTable.schema` and remove `CatalogColumn`. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14363 from cloud-fan/column.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala50
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala25
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala24
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala29
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala6
17 files changed, 120 insertions, 175 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e36241a436..980efda6cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -259,14 +259,7 @@ class SessionCatalog(
identifier = tid,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = tempTables(table).output.map { c =>
- CatalogColumn(
- name = c.name,
- dataType = c.dataType.catalogString,
- nullable = c.nullable,
- comment = Option(c.name)
- )
- },
+ schema = tempTables(table).output.toStructType,
properties = Map(),
viewText = None)
} else {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 710bce5da9..38f0bc2c4f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.catalog
import java.util.Date
-import javax.annotation.Nullable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -26,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.types.StructType
/**
@@ -78,28 +78,6 @@ object CatalogStorageFormat {
}
/**
- * A column in a table.
- */
-case class CatalogColumn(
- name: String,
- // TODO: make this type-safe; this is left as a string due to issues in converting Hive
- // varchars to and from SparkSQL strings.
- dataType: String,
- nullable: Boolean = true,
- comment: Option[String] = None) {
-
- override def toString: String = {
- val output =
- Seq(s"`$name`",
- dataType,
- if (!nullable) "NOT NULL" else "",
- comment.map("(" + _ + ")").getOrElse(""))
- output.filter(_.nonEmpty).mkString(" ")
- }
-
-}
-
-/**
* A partition (Hive style) defined in the catalog.
*
* @param spec partition spec values indexed by column name
@@ -141,7 +119,7 @@ case class CatalogTable(
identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
- schema: Seq[CatalogColumn],
+ schema: StructType,
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
owner: String = "",
@@ -163,9 +141,10 @@ case class CatalogTable(
requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort")
requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket")
- /** Columns this table is partitioned by. */
- def partitionColumns: Seq[CatalogColumn] =
- schema.filter { c => partitionColumnNames.contains(c.name) }
+ /** schema of this table's partition columns */
+ def partitionSchema: StructType = StructType(schema.filter {
+ c => partitionColumnNames.contains(c.name)
+ })
/** Return the database this table was specified to belong to, assuming it exists. */
def database: String = identifier.database.getOrElse {
@@ -277,16 +256,13 @@ case class SimpleCatalogRelation(
override lazy val resolved: Boolean = false
override val output: Seq[Attribute] = {
- val cols = catalogTable.schema
- .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
- (cols ++ catalogTable.partitionColumns).map { f =>
- AttributeReference(
- f.name,
- CatalystSqlParser.parseDataType(f.dataType),
- // Since data can be dumped in randomly with no validation, everything is nullable.
- nullable = true
- )(qualifier = Some(metadata.identifier.table))
- }
+ val (partCols, dataCols) = metadata.schema.toAttributes
+ // Since data can be dumped in randomly with no validation, everything is nullable.
+ .map(_.withNullability(true).withQualifier(Some(metadata.identifier.table)))
+ .partition { a =>
+ metadata.partitionColumnNames.contains(a.name)
+ }
+ dataCols ++ partCols
}
require(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 3a0dcea903..963a225cdf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -551,7 +552,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
- schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
+ schema = new StructType().add("a", "int").add("b", "string")
)
catalog.createTable("db1", table, ignoreIfExists = false)
@@ -570,7 +571,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
storage = CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
None, None, None, false, Map.empty),
- schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
+ schema = new StructType().add("a", "int").add("b", "string")
)
catalog.createTable("db1", externalTable, ignoreIfExists = false)
assert(!exists(db.locationUri, "external_table"))
@@ -583,11 +584,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
- schema = Seq(
- CatalogColumn("col1", "int"),
- CatalogColumn("col2", "string"),
- CatalogColumn("a", "int"),
- CatalogColumn("b", "string")),
+ schema = new StructType()
+ .add("col1", "int")
+ .add("col2", "string")
+ .add("a", "int")
+ .add("b", "string"),
partitionColumnNames = Seq("a", "b")
)
catalog.createTable("db1", table, ignoreIfExists = false)
@@ -686,11 +687,11 @@ abstract class CatalogTestUtils {
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL,
storage = storageFormat,
- schema = Seq(
- CatalogColumn("col1", "int"),
- CatalogColumn("col2", "string"),
- CatalogColumn("a", "int"),
- CatalogColumn("b", "string")),
+ schema = new StructType()
+ .add("col1", "int")
+ .add("col2", "string")
+ .add("a", "int")
+ .add("b", "string"),
partitionColumnNames = Seq("a", "b"),
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 5e1ad9b885..22b1e07219 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation,
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, StructType}
/**
* Concrete parser for Spark SQL statements.
@@ -928,13 +928,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
}
val comment = Option(ctx.STRING).map(string)
- val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
- val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+ val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
+ val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val selectQuery = Option(ctx.query).map(plan)
// Ensuring whether no duplicate name is used in table definition
- val colNames = cols.map(_.name)
+ val colNames = dataCols.map(_.name)
if (colNames.length != colNames.distinct.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
@@ -952,7 +952,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
- val schema = cols ++ partitionCols
+ val schema = StructType(dataCols ++ partitionCols)
// Storage format
val defaultStorage: CatalogStorageFormat = {
@@ -1297,23 +1297,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Create a sequence of [[CatalogColumn]]s from a column list
- */
- private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) {
- ctx.colType.asScala.map { col =>
- CatalogColumn(
- col.identifier.getText.toLowerCase,
- // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
- // just convert the whole type string to lower case, otherwise the struct field names
- // will no longer be case sensitive. Instead, we rely on our parser to get the proper
- // case before passing it to Hive.
- typedVisit[DataType](col.dataType).catalogString,
- nullable = true,
- Option(col.STRING).map(string))
- }
- }
-
- /**
* Create a [[ScriptInputOutputSchema]].
*/
override protected def withScriptIOSchema(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index fa3967c676..93eb386ade 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -395,7 +395,7 @@ object CreateDataSourceTableUtils extends Logging {
CatalogTable(
identifier = tableIdent,
tableType = tableType,
- schema = Nil,
+ schema = new StructType,
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
@@ -424,9 +424,7 @@ object CreateDataSourceTableUtils extends Logging {
compressed = false,
properties = options
),
- schema = relation.schema.map { f =>
- CatalogColumn(f.name, f.dataType.catalogString)
- },
+ schema = relation.schema,
properties = tableProperties.toMap,
viewText = None)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7e99593fbc..f0e49e65c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -518,7 +518,7 @@ object DDLUtils {
}
def isTablePartitioned(table: CatalogTable): Boolean = {
- table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
+ table.partitionColumnNames.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
}
// A persisted data source table always store its schema in the catalog.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f85373c751..e6fe9a73a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -439,10 +439,10 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer)
}
} else {
- if (table.partitionColumns.nonEmpty) {
+ if (table.partitionColumnNames.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
- describeSchema(table.partitionColumns, buffer)
+ describeSchema(table.partitionSchema, buffer)
}
}
}
@@ -521,12 +521,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}
- private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
- schema.foreach { column =>
- append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
- }
- }
-
private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull)
@@ -701,7 +695,7 @@ case class ShowPartitionsCommand(
* thrown if the partitioning spec is invalid.
*/
if (spec.isDefined) {
- val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
+ val badColumns = spec.get.keySet.filterNot(tab.partitionColumnNames.contains)
if (badColumns.nonEmpty) {
val badCols = badColumns.mkString("[", ", ", "]")
throw new AnalysisException(
@@ -799,14 +793,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
.foreach(builder.append)
}
- private def columnToDDLFragment(column: CatalogColumn): String = {
- val comment = column.comment.map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'")
- s"${quoteIdentifier(column.name)} ${column.dataType}${comment.getOrElse("")}"
+ private def columnToDDLFragment(column: StructField): String = {
+ val comment = column.getComment().map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'")
+ s"${quoteIdentifier(column.name)} ${column.dataType.catalogString}${comment.getOrElse("")}"
}
private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: StringBuilder): Unit = {
- if (metadata.partitionColumns.nonEmpty) {
- val partCols = metadata.partitionColumns.map(columnToDDLFragment)
+ if (metadata.partitionColumnNames.nonEmpty) {
+ val partCols = metadata.partitionSchema.map(columnToDDLFragment)
builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 901a9b9cf5..e397cfa058 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -21,10 +21,11 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.types.StructType
/**
@@ -161,18 +162,17 @@ case class CreateViewCommand(
* SQL based on the analyzed plan, and also creates the proper schema for the view.
*/
private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
- val viewSQL: String = {
- val logicalPlan = if (userSpecifiedColumns.isEmpty) {
- analyzedPlan
- } else {
- val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (attr, (colName, _)) => Alias(attr, colName)()
- }
- sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
+ val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
+ analyzedPlan
+ } else {
+ val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+ case (attr, (colName, _)) => Alias(attr, colName)()
}
- new SQLBuilder(logicalPlan).toSQL
+ sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
+ val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
+
// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
@@ -184,14 +184,11 @@ case class CreateViewCommand(
}
val viewSchema = if (userSpecifiedColumns.isEmpty) {
- analyzedPlan.output.map { a =>
- CatalogColumn(a.name, a.dataType.catalogString)
- }
+ aliasedPlan.schema
} else {
- analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (a, (name, comment)) =>
- CatalogColumn(name, a.dataType.catalogString, comment = comment)
- }
+ StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map {
+ case (field, (_, comment)) => comment.map(field.withComment).getOrElse(field)
+ })
}
CatalogTable(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 5393b76161..f8f78723b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -157,8 +157,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
val columns = tableMetadata.schema.map { c =>
new Column(
name = c.name,
- description = c.comment.orNull,
- dataType = c.dataType,
+ description = c.getComment().orNull,
+ dataType = c.dataType.catalogString,
nullable = c.nullable,
isPartition = partitionColumnNames.contains(c.name),
isBucket = bucketColumnNames.contains(c.name))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 7bd1b0bcdb..564fc73ee7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -89,11 +89,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
- schema = Seq(
- CatalogColumn("col1", "int"),
- CatalogColumn("col2", "string"),
- CatalogColumn("a", "int"),
- CatalogColumn("b", "int")),
+ schema = new StructType()
+ .add("col1", "int")
+ .add("col2", "string")
+ .add("a", "int")
+ .add("b", "int"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L)
}
@@ -258,9 +258,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
userSpecifiedPartitionCols: Option[String],
expectedSchema: StructType,
expectedPartitionCols: Seq[String]): Unit = {
- var tableSchema = StructType(Nil)
- var partCols = Seq.empty[String]
-
val tabName = "tab1"
withTable(tabName) {
val partitionClause =
@@ -277,11 +274,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
""".stripMargin)
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
- tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
- partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
+ assert(expectedSchema ==
+ DDLUtils.getSchemaFromTableProperties(tableMetadata))
+ assert(expectedPartitionCols ==
+ DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata))
}
- assert(tableSchema == expectedSchema)
- assert(partCols == expectedPartitionCols)
}
test("Create partitioned data source table without user specified schema") {
@@ -601,7 +598,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
- assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", "int")))
+ assert(table.schema == new StructType().add("a", "int").add("b", "int"))
assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index f3c849b9f2..195fce8354 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.types.StructField
private[hive] case class MetastoreRelation(
@@ -61,8 +61,8 @@ private[hive] case class MetastoreRelation(
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
- private def toHiveColumn(c: CatalogColumn): FieldSchema = {
- new FieldSchema(c.name, c.dataType, c.comment.orNull)
+ private def toHiveColumn(c: StructField): FieldSchema = {
+ new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
}
// TODO: merge this with HiveClientImpl#toHiveTable
@@ -200,17 +200,17 @@ private[hive] case class MetastoreRelation(
hiveQlTable.getMetadata
)
- implicit class SchemaAttribute(f: CatalogColumn) {
+ implicit class SchemaAttribute(f: StructField) {
def toAttribute: AttributeReference = AttributeReference(
f.name,
- CatalystSqlParser.parseDataType(f.dataType),
+ f.dataType,
// Since data can be dumped in randomly with no validation, everything is nullable.
nullable = true
)(qualifier = Some(tableName))
}
/** PartitionKey attributes */
- val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute)
+ val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute)
/** Non-partitionKey attributes */
// TODO: just make this hold the schema itself, not just non-partition columns
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 2392cc0bdd..ef69ac76f2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -43,8 +43,10 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPa
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.{CircularBuffer, Utils}
/**
@@ -336,7 +338,7 @@ private[hive] class HiveClientImpl(
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
- val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols
+ val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols)
// Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet)
val unsupportedFeatures = ArrayBuffer.empty[String]
@@ -721,16 +723,22 @@ private[hive] class HiveClientImpl(
Utils.classForName(name)
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
- private def toHiveColumn(c: CatalogColumn): FieldSchema = {
- new FieldSchema(c.name, c.dataType, c.comment.orNull)
+ private def toHiveColumn(c: StructField): FieldSchema = {
+ new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
}
- private def fromHiveColumn(hc: FieldSchema): CatalogColumn = {
- new CatalogColumn(
+ private def fromHiveColumn(hc: FieldSchema): StructField = {
+ val columnType = try {
+ CatalystSqlParser.parseDataType(hc.getType)
+ } catch {
+ case e: ParseException =>
+ throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
+ }
+ val field = StructField(
name = hc.getName,
- dataType = hc.getType,
- nullable = true,
- comment = Option(hc.getComment))
+ dataType = columnType,
+ nullable = true)
+ Option(hc.getComment).map(field.withComment).getOrElse(field)
}
private def toHiveTable(table: CatalogTable): HiveTable = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 2762e0cdd5..678bf8da73 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.MetastoreRelation
@@ -65,9 +65,7 @@ case class CreateHiveTableAsSelectCommand(
val withSchema = if (withFormat.schema.isEmpty) {
// Hive doesn't support specifying the column list for target table in CTAS
// However we don't think SparkSQL should follow that.
- tableDesc.copy(schema = query.output.map { c =>
- CatalogColumn(c.name, c.dataType.catalogString)
- })
+ tableDesc.copy(schema = query.output.toStructType)
} else {
withFormat
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 5450fba753..e0c07db3b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans
import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.types.StructType
class HiveDDLCommandSuite extends PlanTest {
val parser = TestHive.sessionState.sqlParser
@@ -67,7 +68,7 @@ class HiveDDLCommandSuite extends PlanTest {
// TODO will be SQLText
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
- assert(desc.partitionColumns == Seq.empty[CatalogColumn])
+ assert(desc.partitionColumnNames.isEmpty)
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde ==
@@ -98,7 +99,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.comment == Some("This is the staging page view table"))
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
- assert(desc.partitionColumns == Seq.empty[CatalogColumn])
+ assert(desc.partitionColumnNames.isEmpty)
assert(desc.storage.properties == Map())
assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
@@ -114,7 +115,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.identifier.table == "page_view")
assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.storage.locationUri == None)
- assert(desc.schema == Seq.empty[CatalogColumn])
+ assert(desc.schema.isEmpty)
assert(desc.viewText == None) // TODO will be SQLText
assert(desc.viewOriginalText.isEmpty)
assert(desc.storage.properties == Map())
@@ -150,7 +151,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.identifier.table == "ctas2")
assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.storage.locationUri == None)
- assert(desc.schema == Seq.empty[CatalogColumn])
+ assert(desc.schema.isEmpty)
assert(desc.viewText == None) // TODO will be SQLText
assert(desc.viewOriginalText.isEmpty)
assert(desc.storage.properties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
@@ -291,7 +292,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.identifier.database.isEmpty)
assert(desc.identifier.table == "my_table")
assert(desc.tableType == CatalogTableType.MANAGED)
- assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string")))
+ assert(desc.schema == new StructType().add("id", "int").add("name", "string"))
assert(desc.partitionColumnNames.isEmpty)
assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
@@ -342,10 +343,10 @@ class HiveDDLCommandSuite extends PlanTest {
test("create table - partitioned columns") {
val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)"
val (desc, _) = extractTableDesc(query)
- assert(desc.schema == Seq(
- CatalogColumn("id", "int"),
- CatalogColumn("name", "string"),
- CatalogColumn("month", "int")))
+ assert(desc.schema == new StructType()
+ .add("id", "int")
+ .add("name", "string")
+ .add("month", "int"))
assert(desc.partitionColumnNames == Seq("month"))
}
@@ -446,10 +447,10 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.identifier.database == Some("dbx"))
assert(desc.identifier.table == "my_table")
assert(desc.tableType == CatalogTableType.EXTERNAL)
- assert(desc.schema == Seq(
- CatalogColumn("id", "int"),
- CatalogColumn("name", "string"),
- CatalogColumn("month", "int")))
+ assert(desc.schema == new StructType()
+ .add("id", "int")
+ .add("name", "string")
+ .add("month", "int"))
assert(desc.partitionColumnNames == Seq("month"))
assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 754aabb5ac..9d72367f43 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
-import org.apache.spark.sql.types.{DecimalType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{DecimalType, IntegerType, StringType, StructField, StructType}
class HiveMetastoreCatalogSuite extends TestHiveSingleton {
import spark.implicits._
@@ -102,7 +102,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
- assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
+ assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
checkAnswer(table("t"), testDF)
assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
@@ -135,7 +135,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
- assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
+ assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
checkAnswer(table("t"), testDF)
assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") ===
@@ -166,7 +166,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
- assert(columns.map(_.dataType) === Seq("int", "string"))
+ assert(columns.map(_.dataType) === Seq(IntegerType, StringType))
checkAnswer(table("t"), Row(1, "val_1"))
assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 571cae001c..c87bda9047 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -726,7 +726,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val hiveTable = CatalogTable(
identifier = TableIdentifier(tableName, Some("default")),
tableType = CatalogTableType.MANAGED,
- schema = Seq.empty,
+ schema = new StructType,
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
@@ -998,7 +998,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// As a proxy for verifying that the table was stored in Hive compatible format,
// we verify that each column of the table is of native type StringType.
assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema
- .forall(column => CatalystSqlParser.parseDataType(column.dataType) == StringType))
+ .forall(_.dataType == StringType))
createDataSourceTable(
sparkSession = spark,
@@ -1013,8 +1013,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// As a proxy for verifying that the table was stored in SparkSQL format,
// we verify that the table has a column type as array of StringType.
assert(sharedState.externalCatalog.getTable("default", "skip_hive_metadata")
- .schema.forall { c =>
- CatalystSqlParser.parseDataType(c.dataType) == ArrayType(StringType) })
+ .schema.forall(_.dataType == ArrayType(StringType)))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 066c3ffaba..a2509f2a75 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client
import java.io.{ByteArrayOutputStream, File, PrintStream}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
@@ -32,10 +31,11 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.{MutableURLClassLoader, Utils}
@@ -146,7 +146,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
CatalogTable(
identifier = TableIdentifier(tableName, Some(database)),
tableType = CatalogTableType.MANAGED,
- schema = Seq(CatalogColumn("key", "int")),
+ schema = new StructType().add("key", "int"),
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = Some(classOf[TextInputFormat].getName),