aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-07-31 18:18:53 -0700
committerYin Huai <yhuai@databricks.com>2016-07-31 18:18:53 -0700
commit301fb0d7236eb55d53c9cd60804a2d755b4ad3b2 (patch)
tree51383907d4c442831fac687f61711ba198949561 /sql/hive
parent064d91ff7342002414d3274694a8e2e37f154986 (diff)
downloadspark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.tar.gz
spark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.tar.bz2
spark-301fb0d7236eb55d53c9cd60804a2d755b4ad3b2.zip
[SPARK-16731][SQL] use StructType in CatalogTable and remove CatalogColumn
## What changes were proposed in this pull request? `StructField` has very similar semantic with `CatalogColumn`, except that `CatalogColumn` use string to express data type. I think it's reasonable to use `StructType` as the `CatalogTable.schema` and remove `CatalogColumn`. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14363 from cloud-fan/column.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala24
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala29
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala6
7 files changed, 49 insertions, 43 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index f3c849b9f2..195fce8354 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.types.StructField
private[hive] case class MetastoreRelation(
@@ -61,8 +61,8 @@ private[hive] case class MetastoreRelation(
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
- private def toHiveColumn(c: CatalogColumn): FieldSchema = {
- new FieldSchema(c.name, c.dataType, c.comment.orNull)
+ private def toHiveColumn(c: StructField): FieldSchema = {
+ new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
}
// TODO: merge this with HiveClientImpl#toHiveTable
@@ -200,17 +200,17 @@ private[hive] case class MetastoreRelation(
hiveQlTable.getMetadata
)
- implicit class SchemaAttribute(f: CatalogColumn) {
+ implicit class SchemaAttribute(f: StructField) {
def toAttribute: AttributeReference = AttributeReference(
f.name,
- CatalystSqlParser.parseDataType(f.dataType),
+ f.dataType,
// Since data can be dumped in randomly with no validation, everything is nullable.
nullable = true
)(qualifier = Some(tableName))
}
/** PartitionKey attributes */
- val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute)
+ val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute)
/** Non-partitionKey attributes */
// TODO: just make this hold the schema itself, not just non-partition columns
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 2392cc0bdd..ef69ac76f2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -43,8 +43,10 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPa
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.{CircularBuffer, Utils}
/**
@@ -336,7 +338,7 @@ private[hive] class HiveClientImpl(
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
- val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols
+ val schema = StructType(h.getCols.asScala.map(fromHiveColumn) ++ partCols)
// Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet)
val unsupportedFeatures = ArrayBuffer.empty[String]
@@ -721,16 +723,22 @@ private[hive] class HiveClientImpl(
Utils.classForName(name)
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
- private def toHiveColumn(c: CatalogColumn): FieldSchema = {
- new FieldSchema(c.name, c.dataType, c.comment.orNull)
+ private def toHiveColumn(c: StructField): FieldSchema = {
+ new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
}
- private def fromHiveColumn(hc: FieldSchema): CatalogColumn = {
- new CatalogColumn(
+ private def fromHiveColumn(hc: FieldSchema): StructField = {
+ val columnType = try {
+ CatalystSqlParser.parseDataType(hc.getType)
+ } catch {
+ case e: ParseException =>
+ throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
+ }
+ val field = StructField(
name = hc.getName,
- dataType = hc.getType,
- nullable = true,
- comment = Option(hc.getComment))
+ dataType = columnType,
+ nullable = true)
+ Option(hc.getComment).map(field.withComment).getOrElse(field)
}
private def toHiveTable(table: CatalogTable): HiveTable = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 2762e0cdd5..678bf8da73 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.MetastoreRelation
@@ -65,9 +65,7 @@ case class CreateHiveTableAsSelectCommand(
val withSchema = if (withFormat.schema.isEmpty) {
// Hive doesn't support specifying the column list for target table in CTAS
// However we don't think SparkSQL should follow that.
- tableDesc.copy(schema = query.output.map { c =>
- CatalogColumn(c.name, c.dataType.catalogString)
- })
+ tableDesc.copy(schema = query.output.toStructType)
} else {
withFormat
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 5450fba753..e0c07db3b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans
import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.types.StructType
class HiveDDLCommandSuite extends PlanTest {
val parser = TestHive.sessionState.sqlParser
@@ -67,7 +68,7 @@ class HiveDDLCommandSuite extends PlanTest {
// TODO will be SQLText
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
- assert(desc.partitionColumns == Seq.empty[CatalogColumn])
+ assert(desc.partitionColumnNames.isEmpty)
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde ==
@@ -98,7 +99,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.comment == Some("This is the staging page view table"))
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
- assert(desc.partitionColumns == Seq.empty[CatalogColumn])
+ assert(desc.partitionColumnNames.isEmpty)
assert(desc.storage.properties == Map())
assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
@@ -114,7 +115,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.identifier.table == "page_view")
assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.storage.locationUri == None)
- assert(desc.schema == Seq.empty[CatalogColumn])
+ assert(desc.schema.isEmpty)
assert(desc.viewText == None) // TODO will be SQLText
assert(desc.viewOriginalText.isEmpty)
assert(desc.storage.properties == Map())
@@ -150,7 +151,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.identifier.table == "ctas2")
assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.storage.locationUri == None)
- assert(desc.schema == Seq.empty[CatalogColumn])
+ assert(desc.schema.isEmpty)
assert(desc.viewText == None) // TODO will be SQLText
assert(desc.viewOriginalText.isEmpty)
assert(desc.storage.properties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
@@ -291,7 +292,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.identifier.database.isEmpty)
assert(desc.identifier.table == "my_table")
assert(desc.tableType == CatalogTableType.MANAGED)
- assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string")))
+ assert(desc.schema == new StructType().add("id", "int").add("name", "string"))
assert(desc.partitionColumnNames.isEmpty)
assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
@@ -342,10 +343,10 @@ class HiveDDLCommandSuite extends PlanTest {
test("create table - partitioned columns") {
val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)"
val (desc, _) = extractTableDesc(query)
- assert(desc.schema == Seq(
- CatalogColumn("id", "int"),
- CatalogColumn("name", "string"),
- CatalogColumn("month", "int")))
+ assert(desc.schema == new StructType()
+ .add("id", "int")
+ .add("name", "string")
+ .add("month", "int"))
assert(desc.partitionColumnNames == Seq("month"))
}
@@ -446,10 +447,10 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.identifier.database == Some("dbx"))
assert(desc.identifier.table == "my_table")
assert(desc.tableType == CatalogTableType.EXTERNAL)
- assert(desc.schema == Seq(
- CatalogColumn("id", "int"),
- CatalogColumn("name", "string"),
- CatalogColumn("month", "int")))
+ assert(desc.schema == new StructType()
+ .add("id", "int")
+ .add("name", "string")
+ .add("month", "int"))
assert(desc.partitionColumnNames == Seq("month"))
assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 754aabb5ac..9d72367f43 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
-import org.apache.spark.sql.types.{DecimalType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{DecimalType, IntegerType, StringType, StructField, StructType}
class HiveMetastoreCatalogSuite extends TestHiveSingleton {
import spark.implicits._
@@ -102,7 +102,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
- assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
+ assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
checkAnswer(table("t"), testDF)
assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
@@ -135,7 +135,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
- assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
+ assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
checkAnswer(table("t"), testDF)
assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") ===
@@ -166,7 +166,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
- assert(columns.map(_.dataType) === Seq("int", "string"))
+ assert(columns.map(_.dataType) === Seq(IntegerType, StringType))
checkAnswer(table("t"), Row(1, "val_1"))
assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 571cae001c..c87bda9047 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -726,7 +726,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val hiveTable = CatalogTable(
identifier = TableIdentifier(tableName, Some("default")),
tableType = CatalogTableType.MANAGED,
- schema = Seq.empty,
+ schema = new StructType,
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
@@ -998,7 +998,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// As a proxy for verifying that the table was stored in Hive compatible format,
// we verify that each column of the table is of native type StringType.
assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema
- .forall(column => CatalystSqlParser.parseDataType(column.dataType) == StringType))
+ .forall(_.dataType == StringType))
createDataSourceTable(
sparkSession = spark,
@@ -1013,8 +1013,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// As a proxy for verifying that the table was stored in SparkSQL format,
// we verify that the table has a column type as array of StringType.
assert(sharedState.externalCatalog.getTable("default", "skip_hive_metadata")
- .schema.forall { c =>
- CatalystSqlParser.parseDataType(c.dataType) == ArrayType(StringType) })
+ .schema.forall(_.dataType == ArrayType(StringType)))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 066c3ffaba..a2509f2a75 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client
import java.io.{ByteArrayOutputStream, File, PrintStream}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
@@ -32,10 +31,11 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.{MutableURLClassLoader, Utils}
@@ -146,7 +146,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
CatalogTable(
identifier = TableIdentifier(tableName, Some(database)),
tableType = CatalogTableType.MANAGED,
- schema = Seq(CatalogColumn("key", "int")),
+ schema = new StructType().add("key", "int"),
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = Some(classOf[TextInputFormat].getName),