aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-02-11 22:21:14 -0800
committerXiao Li <gatorsmile@gmail.com>2017-02-11 22:21:14 -0800
commit3881f342b49efdb1e0d5ee27f616451ea1928c5d (patch)
tree9e1473aa757943cc46280cf67cbb874c176c209e /sql
parent0fbecc736df95bf757cb497c108ae3dbc5893829 (diff)
downloadspark-3881f342b49efdb1e0d5ee27f616451ea1928c5d.tar.gz
spark-3881f342b49efdb1e0d5ee27f616451ea1928c5d.tar.bz2
spark-3881f342b49efdb1e0d5ee27f616451ea1928c5d.zip
[SPARK-19448][SQL] optimize some duplication functions between HiveClientImpl and HiveUtils
## What changes were proposed in this pull request? There are some duplicate functions between `HiveClientImpl` and `HiveUtils`, we can merge them to one place. such as: `toHiveTable` 、`toHivePartition`、`fromHivePartition`. And additional modify is change `MetastoreRelation.attributes` to `MetastoreRelation.dataColKeys` https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala#L234 ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #16787 from windpiger/todoInMetaStoreRelation.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala127
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala70
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala25
7 files changed, 88 insertions, 157 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 13ab4e88e8..afc2bf8533 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -31,17 +31,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo
-import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, CatalogTableType}
-import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.internal.SQLConf
@@ -455,117 +451,6 @@ private[spark] object HiveUtils extends Logging {
case (other, tpe) if primitiveTypes contains tpe => other.toString
}
- /** Converts the native StructField to Hive's FieldSchema. */
- private def toHiveColumn(c: StructField): FieldSchema = {
- val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
- c.metadata.getString(HIVE_TYPE_STRING)
- } else {
- c.dataType.catalogString
- }
- new FieldSchema(c.name, typeString, c.getComment.orNull)
- }
-
- /** Builds the native StructField from Hive's FieldSchema. */
- private def fromHiveColumn(hc: FieldSchema): StructField = {
- val columnType = try {
- CatalystSqlParser.parseDataType(hc.getType)
- } catch {
- case e: ParseException =>
- throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
- }
-
- val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
- val field = StructField(
- name = hc.getName,
- dataType = columnType,
- nullable = true,
- metadata = metadata)
- Option(hc.getComment).map(field.withComment).getOrElse(field)
- }
-
- // TODO: merge this with HiveClientImpl#toHiveTable
- /** Converts the native table metadata representation format CatalogTable to Hive's Table. */
- def toHiveTable(catalogTable: CatalogTable): HiveTable = {
- // We start by constructing an API table as Hive performs several important transformations
- // internally when converting an API table to a QL table.
- val tTable = new org.apache.hadoop.hive.metastore.api.Table()
- tTable.setTableName(catalogTable.identifier.table)
- tTable.setDbName(catalogTable.database)
-
- val tableParameters = new java.util.HashMap[String, String]()
- tTable.setParameters(tableParameters)
- catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
-
- tTable.setTableType(catalogTable.tableType match {
- case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
- case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
- case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
- })
-
- val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
- tTable.setSd(sd)
-
- // Note: In Hive the schema and partition columns must be disjoint sets
- val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
- catalogTable.partitionColumnNames.contains(c.getName)
- }
- sd.setCols(schema.asJava)
- tTable.setPartitionKeys(partCols.asJava)
-
- catalogTable.storage.locationUri.foreach(sd.setLocation)
- catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
- catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
-
- val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
- sd.setSerdeInfo(serdeInfo)
-
- val serdeParameters = new java.util.HashMap[String, String]()
- catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
- serdeInfo.setParameters(serdeParameters)
-
- new HiveTable(tTable)
- }
-
- /**
- * Converts the native partition metadata representation format CatalogTablePartition to
- * Hive's Partition.
- */
- def toHivePartition(
- catalogTable: CatalogTable,
- hiveTable: HiveTable,
- partition: CatalogTablePartition): HivePartition = {
- val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
- tPartition.setDbName(catalogTable.database)
- tPartition.setTableName(catalogTable.identifier.table)
- tPartition.setValues(catalogTable.partitionColumnNames.map(partition.spec(_)).asJava)
-
- val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
- tPartition.setSd(sd)
-
- // Note: In Hive the schema and partition columns must be disjoint sets
- val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
- !catalogTable.partitionColumnNames.contains(c.getName)
- }
- sd.setCols(schema.asJava)
-
- partition.storage.locationUri.foreach(sd.setLocation)
- partition.storage.inputFormat.foreach(sd.setInputFormat)
- partition.storage.outputFormat.foreach(sd.setOutputFormat)
-
- val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- sd.setSerdeInfo(serdeInfo)
- // maps and lists should be set only after all elements are ready (see HIVE-7975)
- partition.storage.serde.foreach(serdeInfo.setSerializationLib)
-
- val serdeParameters = new java.util.HashMap[String, String]()
- catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
- partition.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
- serdeInfo.setParameters(serdeParameters)
-
- new HivePartition(hiveTable, tPartition)
- }
-
/**
* Infers the schema for Hive serde tables and returns the CatalogTable with the inferred schema.
* When the tables are data source tables or the schema already exists, returns the original
@@ -575,12 +460,12 @@ private[spark] object HiveUtils extends Logging {
if (DDLUtils.isDatasourceTable(table) || table.dataSchema.nonEmpty) {
table
} else {
- val hiveTable = toHiveTable(table)
+ val hiveTable = HiveClientImpl.toHiveTable(table)
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
- val partCols = hiveTable.getPartCols.asScala.map(fromHiveColumn)
- val schema = StructType(hiveTable.getCols.asScala.map(fromHiveColumn) ++ partCols)
- table.copy(schema = schema)
+ val partCols = hiveTable.getPartCols.asScala.map(HiveClientImpl.fromHiveColumn)
+ val dataCols = hiveTable.getCols.asScala.map(HiveClientImpl.fromHiveColumn)
+ table.copy(schema = StructType(dataCols ++ partCols))
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 6394eb6da5..97b120758b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.FileRelation
+import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.types.StructField
@@ -56,7 +57,7 @@ private[hive] case class MetastoreRelation(
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
- @transient val hiveQlTable: HiveTable = HiveUtils.toHiveTable(catalogTable)
+ @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable)
@transient override def computeStats(conf: CatalystConf): Statistics = {
catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
@@ -111,7 +112,8 @@ private[hive] case class MetastoreRelation(
} else {
allPartitions
}
- rawPartitions.map(HiveUtils.toHivePartition(catalogTable, hiveQlTable, _))
+
+ rawPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
}
/** Only compare database and tablename, not alias. */
@@ -146,18 +148,17 @@ private[hive] case class MetastoreRelation(
val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute)
/** Non-partitionKey attributes */
- // TODO: just make this hold the schema itself, not just non-partition columns
- val attributes = catalogTable.schema
+ val dataColKeys = catalogTable.schema
.filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
.map(_.toAttribute)
- val output = attributes ++ partitionKeys
+ val output = dataColKeys ++ partitionKeys
/** An attribute map that can be used to lookup original attributes based on expression id. */
val attributeMap = AttributeMap(output.map(o => (o, o)))
/** An attribute map for determining the ordinal for non-partition columns. */
- val columnOrdinals = AttributeMap(attributes.zipWithIndex)
+ val columnOrdinals = AttributeMap(dataColKeys.zipWithIndex)
override def inputFiles: Array[String] = {
val partLocations = allPartitions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index f0d01ebfcf..dc9c3ff335 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
@@ -435,7 +436,7 @@ private[hive] class HiveClientImpl(
}
override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
- client.createTable(toHiveTable(table), ignoreIfExists)
+ client.createTable(toHiveTable(table, Some(conf)), ignoreIfExists)
}
override def dropTable(
@@ -447,7 +448,7 @@ private[hive] class HiveClientImpl(
}
override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(table, Some(conf))
// Do not use `table.qualifiedName` here because this may be a rename
val qualifiedTableName = s"${table.database}.$tableName"
client.alterTable(qualifiedTableName, hiveTable)
@@ -516,7 +517,7 @@ private[hive] class HiveClientImpl(
newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState {
require(specs.size == newSpecs.size, "number of old and new partition specs differ")
val catalogTable = getTable(db, table)
- val hiveTable = toHiveTable(catalogTable)
+ val hiveTable = toHiveTable(catalogTable, Some(conf))
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
val hivePart = getPartitionOption(catalogTable, oldSpec)
.map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
@@ -529,7 +530,7 @@ private[hive] class HiveClientImpl(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
- val hiveTable = toHiveTable(getTable(db, table))
+ val hiveTable = toHiveTable(getTable(db, table), Some(conf))
client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
}
@@ -557,7 +558,7 @@ private[hive] class HiveClientImpl(
override def getPartitionOption(
table: CatalogTable,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(table, Some(conf))
val hivePartition = client.getPartition(hiveTable, spec.asJava, false)
Option(hivePartition).map(fromHivePartition)
}
@@ -569,7 +570,7 @@ private[hive] class HiveClientImpl(
override def getPartitions(
table: CatalogTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(table, Some(conf))
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) =>
@@ -583,7 +584,7 @@ private[hive] class HiveClientImpl(
override def getPartitionsByFilter(
table: CatalogTable,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(table, Some(conf))
val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
@@ -776,20 +777,11 @@ private[hive] class HiveClientImpl(
client.dropDatabase(db, true, false, true)
}
}
+}
-
- /* -------------------------------------------------------- *
- | Helper methods for converting to and from Hive classes |
- * -------------------------------------------------------- */
-
- private def toInputFormat(name: String) =
- Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
-
- private def toOutputFormat(name: String) =
- Utils.classForName(name)
- .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
-
- private def toHiveColumn(c: StructField): FieldSchema = {
+private[hive] object HiveClientImpl {
+ /** Converts the native StructField to Hive's FieldSchema. */
+ def toHiveColumn(c: StructField): FieldSchema = {
val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
c.metadata.getString(HIVE_TYPE_STRING)
} else {
@@ -798,7 +790,8 @@ private[hive] class HiveClientImpl(
new FieldSchema(c.name, typeString, c.getComment().orNull)
}
- private def fromHiveColumn(hc: FieldSchema): StructField = {
+ /** Builds the native StructField from Hive's FieldSchema. */
+ def fromHiveColumn(hc: FieldSchema): StructField = {
val columnType = try {
CatalystSqlParser.parseDataType(hc.getType)
} catch {
@@ -815,7 +808,19 @@ private[hive] class HiveClientImpl(
Option(hc.getComment).map(field.withComment).getOrElse(field)
}
- private def toHiveTable(table: CatalogTable): HiveTable = {
+ private def toInputFormat(name: String) =
+ Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+
+ private def toOutputFormat(name: String) =
+ Utils.classForName(name)
+ .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+
+ /**
+ * Converts the native table metadata representation format CatalogTable to Hive's Table.
+ */
+ def toHiveTable(
+ table: CatalogTable,
+ conf: Option[HiveConf] = None): HiveTable = {
val hiveTable = new HiveTable(table.database, table.identifier.table)
// For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties.
// Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
@@ -832,7 +837,9 @@ private[hive] class HiveClientImpl(
val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
table.partitionColumnNames.contains(c.getName)
}
- if (schema.isEmpty) {
+ // after SPARK-19279, it is not allowed to create a hive table with an empty schema,
+ // so here we should not add a default col schema
+ if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
@@ -845,10 +852,10 @@ private[hive] class HiveClientImpl(
hiveTable.setFields(schema.asJava)
}
hiveTable.setPartCols(partCols.asJava)
- hiveTable.setOwner(conf.getUser)
+ conf.foreach(c => hiveTable.setOwner(c.getUser))
hiveTable.setCreateTime((table.createTime / 1000).toInt)
hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
- table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
+ table.storage.locationUri.foreach { loc => hiveTable.getTTable.getSd.setLocation(loc)}
table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
hiveTable.setSerializationLib(
@@ -866,7 +873,11 @@ private[hive] class HiveClientImpl(
hiveTable
}
- private def toHivePartition(
+ /**
+ * Converts the native partition metadata representation format CatalogTablePartition to
+ * Hive's Partition.
+ */
+ def toHivePartition(
p: CatalogTablePartition,
ht: HiveTable): HivePartition = {
val tpart = new org.apache.hadoop.hive.metastore.api.Partition
@@ -891,7 +902,10 @@ private[hive] class HiveClientImpl(
new HivePartition(ht, tpart)
}
- private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
+ /**
+ * Build the native partition metadata from Hive's Partition.
+ */
+ def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
val apiPartition = hp.getTPartition
CatalogTablePartition(
spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index def6ef3691..140c352fa6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -113,7 +113,7 @@ case class HiveTableScanExec(
.mkString(",")
hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
- hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(","))
+ hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataColKeys.map(_.name).mkString(","))
}
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
index 00fdfbcebb..ee632d24b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
@@ -134,6 +134,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
storage = CatalogStorageFormat.empty.copy(
properties = Map("path" -> defaultTableURI("tbl4").toString)),
schema = new StructType(),
+ provider = Some("json"),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema.numParts" -> "1",
@@ -145,6 +146,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
storage = CatalogStorageFormat.empty.copy(
properties = Map("path" -> defaultTableURI("tbl5").toString)),
schema = simpleSchema,
+ provider = Some("parquet"),
properties = Map(
"spark.sql.sources.provider" -> "parquet",
"spark.sql.sources.schema.numParts" -> "1",
@@ -156,6 +158,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
storage = CatalogStorageFormat.empty.copy(
properties = Map("path" -> defaultTableURI("tbl6").toString)),
schema = new StructType(),
+ provider = Some("json"),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema.numParts" -> "1",
@@ -170,6 +173,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
locationUri = Some(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__"),
properties = Map("path" -> tempDirUri)),
schema = new StructType(),
+ provider = Some("json"),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema.numParts" -> "1",
@@ -194,6 +198,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
locationUri = Some(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__"),
properties = Map("path" -> tempDirUri)),
schema = new StructType(),
+ provider = Some("json"),
properties = Map("spark.sql.sources.provider" -> "json"))
// A list of all raw tables we want to test, with their expected schema.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index cf1fe2bc70..e951bbe1dc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -748,6 +748,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
identifier = TableIdentifier(tableName, Some("default")),
tableType = CatalogTableType.MANAGED,
schema = new StructType,
+ provider = Some("json"),
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
@@ -1276,6 +1277,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
identifier = TableIdentifier("t", Some("default")),
tableType = CatalogTableType.MANAGED,
schema = new StructType,
+ provider = Some("json"),
storage = CatalogStorageFormat.empty,
properties = Map(
DATASOURCE_PROVIDER -> "json",
@@ -1373,6 +1375,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
properties = Map("path" -> path.getAbsolutePath)
),
schema = new StructType(),
+ provider = Some("parquet"),
properties = Map(
HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
hiveClient.createTable(tableDesc, ignoreIfExists = false)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index ca39c7e845..fe14824cf0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{ByteArrayOutputStream, File, PrintStream}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
@@ -570,7 +571,6 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
}
}
-
test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") {
withTempDir { dir =>
val path = dir.toURI.toString
@@ -649,6 +649,29 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
}
}
+ test(s"$version: CTAS for managed data source tables") {
+ withTable("t", "t1") {
+ import spark.implicits._
+
+ val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
+ Seq("1").toDF("a").write.saveAsTable("t")
+ val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+
+ assert(table.location.stripSuffix("/") == expectedPath)
+ assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
+ checkAnswer(spark.table("t"), Row("1") :: Nil)
+
+ val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
+ spark.sql("create table t1 using parquet as select 2 as a")
+ val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"
+
+ assert(table1.location.stripSuffix("/") == expectedPath1)
+ assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
+ checkAnswer(spark.table("t1"), Row(2) :: Nil)
+ }
+ }
// TODO: add more tests.
}
}