aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala310
1 files changed, 220 insertions, 90 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c7066d7363..ccc8345d73 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,7 +25,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{TableType => HiveTableType, Warehouse}
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _}
import org.apache.hadoop.hive.ql.plan.TableDesc
@@ -40,12 +40,13 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.{datasources, FileRelation}
+import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource, ParquetRelation}
+import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
-import org.apache.spark.sql.sources.{HadoopFsRelation, HDFSFileCatalog}
+import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
+import org.apache.spark.sql.sources.{FileFormat, HadoopFsRelation, HDFSFileCatalog}
import org.apache.spark.sql.types._
private[hive] case class HiveSerDe(
@@ -85,7 +86,18 @@ private[hive] object HiveSerDe {
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
- serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")))
+ serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
+
+ "textfile" ->
+ HiveSerDe(
+ inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
+
+ "avro" ->
+ HiveSerDe(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
+ serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
val key = source.toLowerCase match {
case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
@@ -102,7 +114,7 @@ private[hive] object HiveSerDe {
* Legacy catalog for interacting with the Hive metastore.
*
* This is still used for things like creating data source tables, but in the future will be
- * cleaned up to integrate more nicely with [[HiveCatalog]].
+ * cleaned up to integrate more nicely with [[HiveExternalCatalog]].
*/
private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
extends Logging {
@@ -124,8 +136,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
QualifiedTableName(
- t.name.database.getOrElse(getCurrentDatabase).toLowerCase,
- t.name.table.toLowerCase)
+ t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase,
+ t.identifier.table.toLowerCase)
}
/** A cache of Spark SQL data source tables that have been accessed. */
@@ -299,7 +311,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
CatalogTable(
- name = TableIdentifier(tblName, Option(dbName)),
+ identifier = TableIdentifier(tblName, Option(dbName)),
tableType = tableType,
schema = Nil,
storage = CatalogStorageFormat(
@@ -319,7 +331,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
assert(relation.partitionSchema.isEmpty)
CatalogTable(
- name = TableIdentifier(tblName, Option(dbName)),
+ identifier = TableIdentifier(tblName, Option(dbName)),
tableType = tableType,
storage = CatalogStorageFormat(
locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
@@ -431,7 +443,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
alias match {
// because hive use things like `_c0` to build the expanded text
// currently we cannot support view from "create view v1(c1) as ..."
- case None => SubqueryAlias(table.name.table, hive.parseSql(viewText))
+ case None => SubqueryAlias(table.identifier.table, hive.parseSql(viewText))
case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
}
} else {
@@ -440,58 +452,72 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
}
- private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
- val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
- val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
-
- val parquetOptions = Map(
- ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
- ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
- metastoreRelation.tableName,
- Some(metastoreRelation.databaseName)
- ).unquotedString
- )
- val tableIdentifier =
- QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
-
- def getCached(
- tableIdentifier: QualifiedTableName,
- pathsInMetastore: Seq[String],
- schemaInMetastore: StructType,
- partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
- cachedDataSourceTables.getIfPresent(tableIdentifier) match {
- case null => None // Cache miss
- case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) =>
- // If we have the same paths, same schema, and same partition spec,
- // we will use the cached Parquet Relation.
- val useCached =
- parquetRelation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet &&
- logical.schema.sameType(metastoreSchema) &&
- parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
- PartitionSpec(StructType(Nil), Array.empty[datasources.PartitionDirectory])
+ private def getCached(
+ tableIdentifier: QualifiedTableName,
+ metastoreRelation: MetastoreRelation,
+ schemaInMetastore: StructType,
+ expectedFileFormat: Class[_ <: FileFormat],
+ expectedBucketSpec: Option[BucketSpec],
+ partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
+
+ cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+ case null => None // Cache miss
+ case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
+ val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq
+ val cachedRelationFileFormatClass = relation.fileFormat.getClass
+
+ expectedFileFormat match {
+ case `cachedRelationFileFormatClass` =>
+ // If we have the same paths, same schema, and same partition spec,
+ // we will use the cached relation.
+ val useCached =
+ relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet &&
+ logical.schema.sameType(schemaInMetastore) &&
+ relation.bucketSpec == expectedBucketSpec &&
+ relation.partitionSpec == partitionSpecInMetastore.getOrElse {
+ PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory])
+ }
+
+ if (useCached) {
+ Some(logical)
+ } else {
+ // If the cached relation is not updated, we invalidate it right away.
+ cachedDataSourceTables.invalidate(tableIdentifier)
+ None
}
-
- if (useCached) {
- Some(logical)
- } else {
- // If the cached relation is not updated, we invalidate it right away.
+ case _ =>
+ logWarning(
+ s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " +
+ s"should be stored as $expectedFileFormat. However, we are getting " +
+ s"a ${relation.fileFormat} from the metastore cache. This cached " +
+ s"entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
- }
- case other =>
- logWarning(
- s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
- s"as Parquet. However, we are getting a $other from the metastore cache. " +
- s"This cached entry will be invalidated.")
- cachedDataSourceTables.invalidate(tableIdentifier)
- None
- }
+ }
+ case other =>
+ logWarning(
+ s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
+ s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " +
+ s"This cached entry will be invalidated.")
+ cachedDataSourceTables.invalidate(tableIdentifier)
+ None
}
+ }
+
+ private def convertToLogicalRelation(metastoreRelation: MetastoreRelation,
+ options: Map[String, String],
+ defaultSource: FileFormat,
+ fileFormatClass: Class[_ <: FileFormat],
+ fileType: String): LogicalRelation = {
+ val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
+ val tableIdentifier =
+ QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
+ val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
- // We're converting the entire table into ParquetRelation, so predicates to Hive metastore
+ // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore
// are empty.
val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
val location = p.getLocation
@@ -504,54 +530,65 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val cached = getCached(
tableIdentifier,
- metastoreRelation.table.storage.locationUri.toSeq,
+ metastoreRelation,
metastoreSchema,
+ fileFormatClass,
+ bucketSpec,
Some(partitionSpec))
- val parquetRelation = cached.getOrElse {
+ val hadoopFsRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
- val format = new DefaultSource()
- val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles())
- val mergedSchema = inferredSchema.map { inferred =>
- ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
- }.getOrElse(metastoreSchema)
+ val inferredSchema = if (fileType.equals("parquet")) {
+ val inferredSchema = defaultSource.inferSchema(hive, options, fileCatalog.allFiles())
+ inferredSchema.map { inferred =>
+ ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
+ }.getOrElse(metastoreSchema)
+ } else {
+ defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get
+ }
val relation = HadoopFsRelation(
sqlContext = hive,
location = fileCatalog,
partitionSchema = partitionSchema,
- dataSchema = mergedSchema,
- bucketSpec = None, // We don't support hive bucketed tables, only ones we write out.
- fileFormat = new DefaultSource(),
- options = parquetOptions)
+ dataSchema = inferredSchema,
+ bucketSpec = bucketSpec,
+ fileFormat = defaultSource,
+ options = options)
val created = LogicalRelation(relation)
cachedDataSourceTables.put(tableIdentifier, created)
created
}
- parquetRelation
+ hadoopFsRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
- val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
- val parquetRelation = cached.getOrElse {
+ val cached = getCached(tableIdentifier,
+ metastoreRelation,
+ metastoreSchema,
+ fileFormatClass,
+ bucketSpec,
+ None)
+ val logicalRelation = cached.getOrElse {
val created =
LogicalRelation(
DataSource(
sqlContext = hive,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
- options = parquetOptions,
- className = "parquet").resolveRelation())
+ bucketSpec = bucketSpec,
+ options = options,
+ className = fileType).resolveRelation())
cachedDataSourceTables.put(tableIdentifier, created)
created
}
- parquetRelation
+ logicalRelation
}
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}
@@ -561,6 +598,27 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
* data source relations for better performance.
*/
object ParquetConversions extends Rule[LogicalPlan] {
+ private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = {
+ relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") &&
+ hive.convertMetastoreParquet
+ }
+
+ private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = {
+ val defaultSource = new ParquetDefaultSource()
+ val fileFormatClass = classOf[ParquetDefaultSource]
+
+ val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
+ val options = Map(
+ ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
+ ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
+ relation.tableName,
+ Some(relation.databaseName)
+ ).unquotedString
+ )
+
+ convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet")
+ }
+
override def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.resolved || plan.analyzed) {
return plan
@@ -570,22 +628,17 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// Write path
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
- if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
- r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- val parquetRelation = convertToParquetRelation(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
+ if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
+ InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)
// Write path
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
- if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
- r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- val parquetRelation = convertToParquetRelation(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
+ if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
+ InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)
// Read path
- case relation: MetastoreRelation if hive.convertMetastoreParquet &&
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
val parquetRelation = convertToParquetRelation(relation)
SubqueryAlias(relation.alias.getOrElse(relation.tableName), parquetRelation)
}
@@ -593,6 +646,50 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
/**
+ * When scanning Metastore ORC tables, convert them to ORC data source relations
+ * for better performance.
+ */
+ object OrcConversions extends Rule[LogicalPlan] {
+ private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = {
+ relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") &&
+ hive.convertMetastoreOrc
+ }
+
+ private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = {
+ val defaultSource = new OrcDefaultSource()
+ val fileFormatClass = classOf[OrcDefaultSource]
+ val options = Map[String, String]()
+
+ convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc")
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ if (!plan.resolved || plan.analyzed) {
+ return plan
+ }
+
+ plan transformUp {
+ // Write path
+ case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Orc data source (yet).
+ if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+ InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)
+
+ // Write path
+ case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Orc data source (yet).
+ if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+ InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)
+
+ // Read path
+ case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
+ val orcRelation = convertToOrcRelation(relation)
+ SubqueryAlias(relation.alias.getOrElse(relation.tableName), orcRelation)
+ }
+ }
+ }
+
+ /**
* Creates any tables required for query execution.
* For example, because of a CREATE TABLE X AS statement.
*/
@@ -611,7 +708,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateViewAsSelect(
- table.copy(name = TableIdentifier(tblName, Some(dbName))),
+ table.copy(identifier = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting,
replace)
@@ -633,7 +730,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
if (hive.convertCTAS && table.storage.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
- if (table.name.database.isDefined) {
+ if (table.identifier.database.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
@@ -641,7 +738,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
- TableIdentifier(desc.name.table),
+ TableIdentifier(desc.identifier.table),
conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
@@ -662,7 +759,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
execution.CreateTableAsSelect(
- desc.copy(name = TableIdentifier(tblName, Some(dbName))),
+ desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
child,
allowExisting)
}
@@ -792,7 +889,7 @@ private[hive] case class MetastoreRelation(
// We start by constructing an API table as Hive performs several important transformations
// internally when converting an API table to a QL table.
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
- tTable.setTableName(table.name.table)
+ tTable.setTableName(table.identifier.table)
tTable.setDbName(table.database)
val tableParameters = new java.util.HashMap[String, String]()
@@ -808,8 +905,13 @@ private[hive] case class MetastoreRelation(
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tTable.setSd(sd)
- sd.setCols(table.schema.map(toHiveColumn).asJava)
- tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava)
+
+ // Note: In Hive the schema and partition columns must be disjoint sets
+ val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
+ table.partitionColumnNames.contains(c.getName)
+ }
+ sd.setCols(schema.asJava)
+ tTable.setPartitionKeys(partCols.asJava)
table.storage.locationUri.foreach(sd.setLocation)
table.storage.inputFormat.foreach(sd.setInputFormat)
@@ -916,7 +1018,10 @@ private[hive] case class MetastoreRelation(
val partitionKeys = table.partitionColumns.map(_.toAttribute)
/** Non-partitionKey attributes */
- val attributes = table.schema.map(_.toAttribute)
+ // TODO: just make this hold the schema itself, not just non-partition columns
+ val attributes = table.schema
+ .filter { c => !table.partitionColumnNames.contains(c.name) }
+ .map(_.toAttribute)
val output = attributes ++ partitionKeys
@@ -977,3 +1082,28 @@ private[hive] object HiveMetastoreTypes {
case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType)
}
}
+
+private[hive] case class CreateTableAsSelect(
+ tableDesc: CatalogTable,
+ child: LogicalPlan,
+ allowExisting: Boolean) extends UnaryNode with Command {
+
+ override def output: Seq[Attribute] = Seq.empty[Attribute]
+ override lazy val resolved: Boolean =
+ tableDesc.identifier.database.isDefined &&
+ tableDesc.schema.nonEmpty &&
+ tableDesc.storage.serde.isDefined &&
+ tableDesc.storage.inputFormat.isDefined &&
+ tableDesc.storage.outputFormat.isDefined &&
+ childrenResolved
+}
+
+private[hive] case class CreateViewAsSelect(
+ tableDesc: CatalogTable,
+ child: LogicalPlan,
+ allowExisting: Boolean,
+ replace: Boolean,
+ sql: String) extends UnaryNode with Command {
+ override def output: Seq[Attribute] = Seq.empty[Attribute]
+ override lazy val resolved: Boolean = false
+}