From 1e886159849e3918445d3fdc3c4cef86c6c1a236 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 1 Apr 2016 13:13:16 -0700 Subject: [SPARK-14070][SQL] Use ORC data source for SQL queries on ORC tables ## What changes were proposed in this pull request? This patch enables use of OrcRelation for SQL queries which read data from Hive tables. Changes in this patch: - Added a new rule `OrcConversions` which would alter the plan to use `OrcRelation`. In this diff, the conversion is done only for reads. - Added a new config `spark.sql.hive.convertMetastoreOrc` to control the conversion BEFORE ``` scala> hqlContext.sql("SELECT * FROM orc_table").explain(true) == Parsed Logical Plan == 'Project [unresolvedalias(*, None)] +- 'UnresolvedRelation `orc_table`, None == Analyzed Logical Plan == key: string, value: string Project [key#171,value#172] +- MetastoreRelation default, orc_table, None == Optimized Logical Plan == MetastoreRelation default, orc_table, None == Physical Plan == HiveTableScan [key#171,value#172], MetastoreRelation default, orc_table, None ``` AFTER ``` scala> hqlContext.sql("SELECT * FROM orc_table").explain(true) == Parsed Logical Plan == 'Project [unresolvedalias(*, None)] +- 'UnresolvedRelation `orc_table`, None == Analyzed Logical Plan == key: string, value: string Project [key#76,value#77] +- SubqueryAlias orc_table +- Relation[key#76,value#77] ORC part: struct<>, data: struct == Optimized Logical Plan == Relation[key#76,value#77] ORC part: struct<>, data: struct == Physical Plan == WholeStageCodegen : +- Scan ORC part: struct<>, data: struct[key#76,value#77] InputPaths: file:/user/hive/warehouse/orc_table ``` ## How was this patch tested? - Added a new unit test. Ran existing unit tests - Ran with production like data ## Performance gains Ran on a production table in Facebook (note that the data was in DWRF file format which is similar to ORC) Best case : when there was no matching rows for the predicate in the query (everything is filtered out) ``` CPU time Wall time Total wall time across all tasks ================================================================ Without the change 541_515 sec 25.0 mins 165.8 hours With change 407 sec 1.5 mins 15 mins ``` Average case: A subset of rows in the data match the query predicate ``` CPU time Wall time Total wall time across all tasks ================================================================ Without the change 624_630 sec 31.0 mins 199.0 h With change 14_769 sec 5.3 mins 7.7 h ``` Author: Tejas Patil Closes #11891 from tejasapatil/orc_ppd. --- .../org/apache/spark/sql/hive/HiveContext.scala | 12 ++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 234 ++++++++++++++------- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 1 + .../apache/spark/sql/hive/HiveSessionState.scala | 1 + 4 files changed, 174 insertions(+), 74 deletions(-) (limited to 'sql/hive/src/main') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c0b6d16d3c..073b954a5f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -154,6 +154,13 @@ class HiveContext private[hive]( protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean = getConf(CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) + /** + * When true, enables an experimental feature where metastore tables that use the Orc SerDe + * are automatically converted to use the Spark SQL ORC table scan, instead of the Hive + * SerDe. + */ + protected[sql] def convertMetastoreOrc: Boolean = getConf(CONVERT_METASTORE_ORC) + /** * When true, a table created by a Hive CTAS statement (no USING clause) will be * converted to a data source table, using the data source set by spark.sql.sources.default. @@ -442,6 +449,11 @@ private[hive] object HiveContext extends Logging { "different Parquet data files. This configuration is only effective " + "when \"spark.sql.hive.convertMetastoreParquet\" is true.") + val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc", + defaultValue = Some(true), + doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " + + "the built in support.") + val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS", defaultValue = Some(false), doc = "When true, a table created by a Hive CTAS statement (no USING clause) will be " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2cdc931c3f..14f331961e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -40,12 +40,13 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.{datasources, FileRelation} +import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource, ParquetRelation} +import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.HiveNativeCommand -import org.apache.spark.sql.sources.{HadoopFsRelation, HDFSFileCatalog} +import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} +import org.apache.spark.sql.sources.{FileFormat, HadoopFsRelation, HDFSFileCatalog} import org.apache.spark.sql.types._ private[hive] case class HiveSerDe( @@ -451,58 +452,72 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } } - private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) - val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging - - val parquetOptions = Map( - ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString, - ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier( - metastoreRelation.tableName, - Some(metastoreRelation.databaseName) - ).unquotedString - ) - val tableIdentifier = - QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) - - def getCached( - tableIdentifier: QualifiedTableName, - pathsInMetastore: Seq[String], - schemaInMetastore: StructType, - partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { - cachedDataSourceTables.getIfPresent(tableIdentifier) match { - case null => None // Cache miss - case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => - // If we have the same paths, same schema, and same partition spec, - // we will use the cached Parquet Relation. - val useCached = - parquetRelation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && - logical.schema.sameType(metastoreSchema) && - parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse { - PartitionSpec(StructType(Nil), Array.empty[datasources.PartitionDirectory]) + private def getCached( + tableIdentifier: QualifiedTableName, + metastoreRelation: MetastoreRelation, + schemaInMetastore: StructType, + expectedFileFormat: Class[_ <: FileFormat], + expectedBucketSpec: Option[BucketSpec], + partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { + + cachedDataSourceTables.getIfPresent(tableIdentifier) match { + case null => None // Cache miss + case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => + val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq + val cachedRelationFileFormatClass = relation.fileFormat.getClass + + expectedFileFormat match { + case `cachedRelationFileFormatClass` => + // If we have the same paths, same schema, and same partition spec, + // we will use the cached relation. + val useCached = + relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && + logical.schema.sameType(schemaInMetastore) && + relation.bucketSpec == expectedBucketSpec && + relation.partitionSpec == partitionSpecInMetastore.getOrElse { + PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory]) + } + + if (useCached) { + Some(logical) + } else { + // If the cached relation is not updated, we invalidate it right away. + cachedDataSourceTables.invalidate(tableIdentifier) + None } - - if (useCached) { - Some(logical) - } else { - // If the cached relation is not updated, we invalidate it right away. + case _ => + logWarning( + s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " + + s"should be stored as $expectedFileFormat. However, we are getting " + + s"a ${relation.fileFormat} from the metastore cache. This cached " + + s"entry will be invalidated.") cachedDataSourceTables.invalidate(tableIdentifier) None - } - case other => - logWarning( - s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + - s"as Parquet. However, we are getting a $other from the metastore cache. " + - s"This cached entry will be invalidated.") - cachedDataSourceTables.invalidate(tableIdentifier) - None - } + } + case other => + logWarning( + s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + + s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " + + s"This cached entry will be invalidated.") + cachedDataSourceTables.invalidate(tableIdentifier) + None } + } + + private def convertToLogicalRelation(metastoreRelation: MetastoreRelation, + options: Map[String, String], + defaultSource: FileFormat, + fileFormatClass: Class[_ <: FileFormat], + fileType: String): LogicalRelation = { + val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + val tableIdentifier = + QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) + val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) val partitionColumnDataTypes = partitionSchema.map(_.dataType) - // We're converting the entire table into ParquetRelation, so predicates to Hive metastore + // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore // are empty. val partitions = metastoreRelation.getHiveQlPartitions().map { p => val location = p.getLocation @@ -515,54 +530,65 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val cached = getCached( tableIdentifier, - metastoreRelation.table.storage.locationUri.toSeq, + metastoreRelation, metastoreSchema, + fileFormatClass, + bucketSpec, Some(partitionSpec)) - val parquetRelation = cached.getOrElse { + val hadoopFsRelation = cached.getOrElse { val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) - val format = new DefaultSource() - val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles()) - val mergedSchema = inferredSchema.map { inferred => - ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred) - }.getOrElse(metastoreSchema) + val inferredSchema = if (fileType.equals("parquet")) { + val inferredSchema = defaultSource.inferSchema(hive, options, fileCatalog.allFiles()) + inferredSchema.map { inferred => + ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred) + }.getOrElse(metastoreSchema) + } else { + defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get + } val relation = HadoopFsRelation( sqlContext = hive, location = fileCatalog, partitionSchema = partitionSchema, - dataSchema = mergedSchema, - bucketSpec = None, // We don't support hive bucketed tables, only ones we write out. - fileFormat = new DefaultSource(), - options = parquetOptions) + dataSchema = inferredSchema, + bucketSpec = bucketSpec, + fileFormat = defaultSource, + options = options) val created = LogicalRelation(relation) cachedDataSourceTables.put(tableIdentifier, created) created } - parquetRelation + hadoopFsRelation } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - val cached = getCached(tableIdentifier, paths, metastoreSchema, None) - val parquetRelation = cached.getOrElse { + val cached = getCached(tableIdentifier, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + None) + val logicalRelation = cached.getOrElse { val created = LogicalRelation( DataSource( sqlContext = hive, paths = paths, userSpecifiedSchema = Some(metastoreRelation.schema), - options = parquetOptions, - className = "parquet").resolveRelation()) + bucketSpec = bucketSpec, + options = options, + className = fileType).resolveRelation()) cachedDataSourceTables.put(tableIdentifier, created) created } - parquetRelation + logicalRelation } result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } @@ -572,6 +598,27 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte * data source relations for better performance. */ object ParquetConversions extends Rule[LogicalPlan] { + private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = { + relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") && + hive.convertMetastoreParquet + } + + private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { + val defaultSource = new ParquetDefaultSource() + val fileFormatClass = classOf[ParquetDefaultSource] + + val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging + val options = Map( + ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString, + ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier( + relation.tableName, + Some(relation.databaseName) + ).unquotedString + ) + + convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet") + } + override def apply(plan: LogicalPlan): LogicalPlan = { if (!plan.resolved || plan.analyzed) { return plan @@ -581,28 +628,67 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // Write path case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && - r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) // Write path case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && - r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) // Read path - case relation: MetastoreRelation if hive.convertMetastoreParquet && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => val parquetRelation = convertToParquetRelation(relation) SubqueryAlias(relation.alias.getOrElse(relation.tableName), parquetRelation) } } } + /** + * When scanning Metastore ORC tables, convert them to ORC data source relations + * for better performance. + */ + object OrcConversions extends Rule[LogicalPlan] { + private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { + relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") && + hive.convertMetastoreOrc + } + + private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { + val defaultSource = new OrcDefaultSource() + val fileFormatClass = classOf[OrcDefaultSource] + val options = Map[String, String]() + + convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.resolved || plan.analyzed) { + return plan + } + + plan transformUp { + // Write path + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + + // Write path + case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + + // Read path + case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => + val orcRelation = convertToOrcRelation(relation) + SubqueryAlias(relation.alias.getOrElse(relation.tableName), orcRelation) + } + } + } + /** * Creates any tables required for query execution. * For example, because of a CREATE TABLE X AS statement. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 1cd783e63a..dfbf22cc47 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -74,6 +74,7 @@ class HiveSessionCatalog( private val metastoreCatalog = new HiveMetastoreCatalog(client, context) val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions + val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 11ef0fd1bb..2bdb428e9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -57,6 +57,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = catalog.ParquetConversions :: + catalog.OrcConversions :: catalog.CreateTables :: catalog.PreInsertionCasts :: python.ExtractPythonUDFs :: -- cgit v1.2.3