diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-07-31 18:18:53 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-07-31 18:18:53 -0700 |
commit | 301fb0d7236eb55d53c9cd60804a2d755b4ad3b2 (patch) | |
tree | 51383907d4c442831fac687f61711ba198949561 /sql/hive/src | |
parent | 064d91ff7342002414d3274694a8e2e37f154986 (diff) | |
download | spark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.tar.gz spark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.tar.bz2 spark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.zip |
[SPARK-16731][SQL] use StructType in CatalogTable and remove CatalogColumn
## What changes were proposed in this pull request?
`StructField` has very similar semantic with `CatalogColumn`, except that `CatalogColumn` use string to express data type. I think it's reasonable to use `StructType` as the `CatalogTable.schema` and remove `CatalogColumn`.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #14363 from cloud-fan/column.
Diffstat (limited to 'sql/hive/src')
7 files changed, 49 insertions, 43 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index f3c849b9f2..195fce8354 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -33,10 +33,10 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.types.StructField private[hive] case class MetastoreRelation( @@ -61,8 +61,8 @@ private[hive] case class MetastoreRelation( override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil - private def toHiveColumn(c: CatalogColumn): FieldSchema = { - new FieldSchema(c.name, c.dataType, c.comment.orNull) + private def toHiveColumn(c: StructField): FieldSchema = { + new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull) } // TODO: merge this with HiveClientImpl#toHiveTable @@ -200,17 +200,17 @@ private[hive] case class MetastoreRelation( hiveQlTable.getMetadata ) - implicit class SchemaAttribute(f: CatalogColumn) { + implicit class SchemaAttribute(f: StructField) { def toAttribute: AttributeReference = AttributeReference( f.name, - CatalystSqlParser.parseDataType(f.dataType), + f.dataType, // Since data can be dumped in randomly with no validation, everything is nullable. nullable = true )(qualifier = Some(tableName)) } /** PartitionKey attributes */ - val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute) + val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute) /** Non-partitionKey attributes */ // TODO: just make this hold the schema itself, not just non-partition columns diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 2392cc0bdd..ef69ac76f2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -43,8 +43,10 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPa import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.{CircularBuffer, Utils} /** @@ -336,7 +338,7 @@ private[hive] class HiveClientImpl( // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema val partCols = h.getPartCols.asScala.map(fromHiveColumn) - val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols + val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols) // Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet) val unsupportedFeatures = ArrayBuffer.empty[String] @@ -721,16 +723,22 @@ private[hive] class HiveClientImpl( Utils.classForName(name) .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - private def toHiveColumn(c: CatalogColumn): FieldSchema = { - new FieldSchema(c.name, c.dataType, c.comment.orNull) + private def toHiveColumn(c: StructField): FieldSchema = { + new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull) } - private def fromHiveColumn(hc: FieldSchema): CatalogColumn = { - new CatalogColumn( + private def fromHiveColumn(hc: FieldSchema): StructField = { + val columnType = try { + CatalystSqlParser.parseDataType(hc.getType) + } catch { + case e: ParseException => + throw new SparkException("Cannot recognize hive type string: " + hc.getType, e) + } + val field = StructField( name = hc.getName, - dataType = hc.getType, - nullable = true, - comment = Option(hc.getComment)) + dataType = columnType, + nullable = true) + Option(hc.getComment).map(field.withComment).getOrElse(field) } private def toHiveTable(table: CatalogTable): HiveTable = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 2762e0cdd5..678bf8da73 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.MetastoreRelation @@ -65,9 +65,7 @@ case class CreateHiveTableAsSelectCommand( val withSchema = if (withFormat.schema.isEmpty) { // Hive doesn't support specifying the column list for target table in CTAS // However we don't think SparkSQL should follow that. - tableDesc.copy(schema = query.output.map { c => - CatalogColumn(c.name, c.dataType.catalogString) - }) + tableDesc.copy(schema = query.output.toStructType) } else { withFormat } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 5450fba753..e0c07db3b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.types.StructType class HiveDDLCommandSuite extends PlanTest { val parser = TestHive.sessionState.sqlParser @@ -67,7 +68,7 @@ class HiveDDLCommandSuite extends PlanTest { // TODO will be SQLText assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) - assert(desc.partitionColumns == Seq.empty[CatalogColumn]) + assert(desc.partitionColumnNames.isEmpty) assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) assert(desc.storage.serde == @@ -98,7 +99,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.comment == Some("This is the staging page view table")) assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) - assert(desc.partitionColumns == Seq.empty[CatalogColumn]) + assert(desc.partitionColumnNames.isEmpty) assert(desc.storage.properties == Map()) assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) @@ -114,7 +115,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.MANAGED) assert(desc.storage.locationUri == None) - assert(desc.schema == Seq.empty[CatalogColumn]) + assert(desc.schema.isEmpty) assert(desc.viewText == None) // TODO will be SQLText assert(desc.viewOriginalText.isEmpty) assert(desc.storage.properties == Map()) @@ -150,7 +151,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.identifier.table == "ctas2") assert(desc.tableType == CatalogTableType.MANAGED) assert(desc.storage.locationUri == None) - assert(desc.schema == Seq.empty[CatalogColumn]) + assert(desc.schema.isEmpty) assert(desc.viewText == None) // TODO will be SQLText assert(desc.viewOriginalText.isEmpty) assert(desc.storage.properties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) @@ -291,7 +292,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.identifier.database.isEmpty) assert(desc.identifier.table == "my_table") assert(desc.tableType == CatalogTableType.MANAGED) - assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string"))) + assert(desc.schema == new StructType().add("id", "int").add("name", "string")) assert(desc.partitionColumnNames.isEmpty) assert(desc.bucketSpec.isEmpty) assert(desc.viewText.isEmpty) @@ -342,10 +343,10 @@ class HiveDDLCommandSuite extends PlanTest { test("create table - partitioned columns") { val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)" val (desc, _) = extractTableDesc(query) - assert(desc.schema == Seq( - CatalogColumn("id", "int"), - CatalogColumn("name", "string"), - CatalogColumn("month", "int"))) + assert(desc.schema == new StructType() + .add("id", "int") + .add("name", "string") + .add("month", "int")) assert(desc.partitionColumnNames == Seq("month")) } @@ -446,10 +447,10 @@ class HiveDDLCommandSuite extends PlanTest { assert(desc.identifier.database == Some("dbx")) assert(desc.identifier.table == "my_table") assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.schema == Seq( - CatalogColumn("id", "int"), - CatalogColumn("name", "string"), - CatalogColumn("month", "int"))) + assert(desc.schema == new StructType() + .add("id", "int") + .add("name", "string") + .add("month", "int")) assert(desc.partitionColumnNames == Seq("month")) assert(desc.bucketSpec.isEmpty) assert(desc.viewText.isEmpty) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 754aabb5ac..9d72367f43 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} -import org.apache.spark.sql.types.{DecimalType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{DecimalType, IntegerType, StringType, StructField, StructType} class HiveMetastoreCatalogSuite extends TestHiveSingleton { import spark.implicits._ @@ -102,7 +102,7 @@ class DataSourceWithHiveMetastoreCatalogSuite val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) - assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string")) + assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType)) checkAnswer(table("t"), testDF) assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) @@ -135,7 +135,7 @@ class DataSourceWithHiveMetastoreCatalogSuite val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) - assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string")) + assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType)) checkAnswer(table("t"), testDF) assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === @@ -166,7 +166,7 @@ class DataSourceWithHiveMetastoreCatalogSuite val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) - assert(columns.map(_.dataType) === Seq("int", "string")) + assert(columns.map(_.dataType) === Seq(IntegerType, StringType)) checkAnswer(table("t"), Row(1, "val_1")) assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 571cae001c..c87bda9047 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -726,7 +726,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val hiveTable = CatalogTable( identifier = TableIdentifier(tableName, Some("default")), tableType = CatalogTableType.MANAGED, - schema = Seq.empty, + schema = new StructType, storage = CatalogStorageFormat( locationUri = None, inputFormat = None, @@ -998,7 +998,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in Hive compatible format, // we verify that each column of the table is of native type StringType. assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema - .forall(column => CatalystSqlParser.parseDataType(column.dataType) == StringType)) + .forall(_.dataType == StringType)) createDataSourceTable( sparkSession = spark, @@ -1013,8 +1013,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in SparkSQL format, // we verify that the table has a column type as array of StringType. assert(sharedState.externalCatalog.getTable("default", "skip_hive_metadata") - .schema.forall { c => - CatalystSqlParser.parseDataType(c.dataType) == ArrayType(StringType) }) + .schema.forall(_.dataType == ArrayType(StringType))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 066c3ffaba..a2509f2a75 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client import java.io.{ByteArrayOutputStream, File, PrintStream} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat @@ -32,10 +31,11 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.StructType import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.{MutableURLClassLoader, Utils} @@ -146,7 +146,7 @@ class VersionsSuite extends SparkFunSuite with Logging { CatalogTable( identifier = TableIdentifier(tableName, Some(database)), tableType = CatalogTableType.MANAGED, - schema = Seq(CatalogColumn("key", "int")), + schema = new StructType().add("key", "int"), storage = CatalogStorageFormat( locationUri = None, inputFormat = Some(classOf[TextInputFormat].getName), |