From 301fb0d7236eb55d53c9cd60804a2d755b4ad3b2 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 31 Jul 2016 18:18:53 -0700 Subject: [SPARK-16731][SQL] use StructType in CatalogTable and remove CatalogColumn ## What changes were proposed in this pull request? `StructField` has very similar semantic with `CatalogColumn`, except that `CatalogColumn` use string to express data type. I think it's reasonable to use `StructType` as the `CatalogTable.schema` and remove `CatalogColumn`. ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #14363 from cloud-fan/column. --- .../sql/catalyst/catalog/SessionCatalog.scala | 9 +--- .../spark/sql/catalyst/catalog/interface.scala | 50 ++++++---------------- 2 files changed, 14 insertions(+), 45 deletions(-) (limited to 'sql/catalyst/src/main') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e36241a436..980efda6cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -259,14 +259,7 @@ class SessionCatalog( identifier = tid, tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = tempTables(table).output.map { c => - CatalogColumn( - name = c.name, - dataType = c.dataType.catalogString, - nullable = c.nullable, - comment = Option(c.name) - ) - }, + schema = tempTables(table).output.toStructType, properties = Map(), viewText = None) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 710bce5da9..38f0bc2c4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.catalog import java.util.Date -import javax.annotation.Nullable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} @@ -26,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.types.StructType /** @@ -77,28 +77,6 @@ object CatalogStorageFormat { outputFormat = None, serde = None, compressed = false, properties = Map.empty) } -/** - * A column in a table. - */ -case class CatalogColumn( - name: String, - // TODO: make this type-safe; this is left as a string due to issues in converting Hive - // varchars to and from SparkSQL strings. - dataType: String, - nullable: Boolean = true, - comment: Option[String] = None) { - - override def toString: String = { - val output = - Seq(s"`$name`", - dataType, - if (!nullable) "NOT NULL" else "", - comment.map("(" + _ + ")").getOrElse("")) - output.filter(_.nonEmpty).mkString(" ") - } - -} - /** * A partition (Hive style) defined in the catalog. * @@ -141,7 +119,7 @@ case class CatalogTable( identifier: TableIdentifier, tableType: CatalogTableType, storage: CatalogStorageFormat, - schema: Seq[CatalogColumn], + schema: StructType, partitionColumnNames: Seq[String] = Seq.empty, bucketSpec: Option[BucketSpec] = None, owner: String = "", @@ -163,9 +141,10 @@ case class CatalogTable( requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort") requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket") - /** Columns this table is partitioned by. */ - def partitionColumns: Seq[CatalogColumn] = - schema.filter { c => partitionColumnNames.contains(c.name) } + /** schema of this table's partition columns */ + def partitionSchema: StructType = StructType(schema.filter { + c => partitionColumnNames.contains(c.name) + }) /** Return the database this table was specified to belong to, assuming it exists. */ def database: String = identifier.database.getOrElse { @@ -277,16 +256,13 @@ case class SimpleCatalogRelation( override lazy val resolved: Boolean = false override val output: Seq[Attribute] = { - val cols = catalogTable.schema - .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } - (cols ++ catalogTable.partitionColumns).map { f => - AttributeReference( - f.name, - CatalystSqlParser.parseDataType(f.dataType), - // Since data can be dumped in randomly with no validation, everything is nullable. - nullable = true - )(qualifier = Some(metadata.identifier.table)) - } + val (partCols, dataCols) = metadata.schema.toAttributes + // Since data can be dumped in randomly with no validation, everything is nullable. + .map(_.withNullability(true).withQualifier(Some(metadata.identifier.table))) + .partition { a => + metadata.partitionColumnNames.contains(a.name) + } + dataCols ++ partCols } require( -- cgit v1.2.3