aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-04-01 13:13:16 -0700
committerMichael Armbrust <michael@databricks.com>2016-04-01 13:13:16 -0700
commit1e886159849e3918445d3fdc3c4cef86c6c1a236 (patch)
tree57b47e41c4944b57c16f93ce7b71d59f84e6733f /sql/hive/src/main
parenta884daad805a701494e87393dc307937472a985d (diff)
downloadspark-1e886159849e3918445d3fdc3c4cef86c6c1a236.tar.gz
spark-1e886159849e3918445d3fdc3c4cef86c6c1a236.tar.bz2
spark-1e886159849e3918445d3fdc3c4cef86c6c1a236.zip
[SPARK-14070][SQL] Use ORC data source for SQL queries on ORC tables
## What changes were proposed in this pull request? This patch enables use of OrcRelation for SQL queries which read data from Hive tables. Changes in this patch: - Added a new rule `OrcConversions` which would alter the plan to use `OrcRelation`. In this diff, the conversion is done only for reads. - Added a new config `spark.sql.hive.convertMetastoreOrc` to control the conversion BEFORE ``` scala> hqlContext.sql("SELECT * FROM orc_table").explain(true) == Parsed Logical Plan == 'Project [unresolvedalias(*, None)] +- 'UnresolvedRelation `orc_table`, None == Analyzed Logical Plan == key: string, value: string Project [key#171,value#172] +- MetastoreRelation default, orc_table, None == Optimized Logical Plan == MetastoreRelation default, orc_table, None == Physical Plan == HiveTableScan [key#171,value#172], MetastoreRelation default, orc_table, None ``` AFTER ``` scala> hqlContext.sql("SELECT * FROM orc_table").explain(true) == Parsed Logical Plan == 'Project [unresolvedalias(*, None)] +- 'UnresolvedRelation `orc_table`, None == Analyzed Logical Plan == key: string, value: string Project [key#76,value#77] +- SubqueryAlias orc_table +- Relation[key#76,value#77] ORC part: struct<>, data: struct<key:string,value:string> == Optimized Logical Plan == Relation[key#76,value#77] ORC part: struct<>, data: struct<key:string,value:string> == Physical Plan == WholeStageCodegen : +- Scan ORC part: struct<>, data: struct<key:string,value:string>[key#76,value#77] InputPaths: file:/user/hive/warehouse/orc_table ``` ## How was this patch tested? - Added a new unit test. Ran existing unit tests - Ran with production like data ## Performance gains Ran on a production table in Facebook (note that the data was in DWRF file format which is similar to ORC) Best case : when there was no matching rows for the predicate in the query (everything is filtered out) ``` CPU time Wall time Total wall time across all tasks ================================================================ Without the change 541_515 sec 25.0 mins 165.8 hours With change 407 sec 1.5 mins 15 mins ``` Average case: A subset of rows in the data match the query predicate ``` CPU time Wall time Total wall time across all tasks ================================================================ Without the change 624_630 sec 31.0 mins 199.0 h With change 14_769 sec 5.3 mins 7.7 h ``` Author: Tejas Patil <tejasp@fb.com> Closes #11891 from tejasapatil/orc_ppd.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala234
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
4 files changed, 174 insertions, 74 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c0b6d16d3c..073b954a5f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -155,6 +155,13 @@ class HiveContext private[hive](
getConf(CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
/**
+ * When true, enables an experimental feature where metastore tables that use the Orc SerDe
+ * are automatically converted to use the Spark SQL ORC table scan, instead of the Hive
+ * SerDe.
+ */
+ protected[sql] def convertMetastoreOrc: Boolean = getConf(CONVERT_METASTORE_ORC)
+
+ /**
* When true, a table created by a Hive CTAS statement (no USING clause) will be
* converted to a data source table, using the data source set by spark.sql.sources.default.
* The table in CTAS statement will be converted when it meets any of the following conditions:
@@ -442,6 +449,11 @@ private[hive] object HiveContext extends Logging {
"different Parquet data files. This configuration is only effective " +
"when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+ val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc",
+ defaultValue = Some(true),
+ doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+ "the built in support.")
+
val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS",
defaultValue = Some(false),
doc = "When true, a table created by a Hive CTAS statement (no USING clause) will be " +
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2cdc931c3f..14f331961e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -40,12 +40,13 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.{datasources, FileRelation}
+import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource, ParquetRelation}
+import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
-import org.apache.spark.sql.sources.{HadoopFsRelation, HDFSFileCatalog}
+import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
+import org.apache.spark.sql.sources.{FileFormat, HadoopFsRelation, HDFSFileCatalog}
import org.apache.spark.sql.types._
private[hive] case class HiveSerDe(
@@ -451,58 +452,72 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
}
- private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
- val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
- val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
-
- val parquetOptions = Map(
- ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
- ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
- metastoreRelation.tableName,
- Some(metastoreRelation.databaseName)
- ).unquotedString
- )
- val tableIdentifier =
- QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
-
- def getCached(
- tableIdentifier: QualifiedTableName,
- pathsInMetastore: Seq[String],
- schemaInMetastore: StructType,
- partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
- cachedDataSourceTables.getIfPresent(tableIdentifier) match {
- case null => None // Cache miss
- case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) =>
- // If we have the same paths, same schema, and same partition spec,
- // we will use the cached Parquet Relation.
- val useCached =
- parquetRelation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet &&
- logical.schema.sameType(metastoreSchema) &&
- parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
- PartitionSpec(StructType(Nil), Array.empty[datasources.PartitionDirectory])
+ private def getCached(
+ tableIdentifier: QualifiedTableName,
+ metastoreRelation: MetastoreRelation,
+ schemaInMetastore: StructType,
+ expectedFileFormat: Class[_ <: FileFormat],
+ expectedBucketSpec: Option[BucketSpec],
+ partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
+
+ cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+ case null => None // Cache miss
+ case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
+ val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq
+ val cachedRelationFileFormatClass = relation.fileFormat.getClass
+
+ expectedFileFormat match {
+ case `cachedRelationFileFormatClass` =>
+ // If we have the same paths, same schema, and same partition spec,
+ // we will use the cached relation.
+ val useCached =
+ relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet &&
+ logical.schema.sameType(schemaInMetastore) &&
+ relation.bucketSpec == expectedBucketSpec &&
+ relation.partitionSpec == partitionSpecInMetastore.getOrElse {
+ PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory])
+ }
+
+ if (useCached) {
+ Some(logical)
+ } else {
+ // If the cached relation is not updated, we invalidate it right away.
+ cachedDataSourceTables.invalidate(tableIdentifier)
+ None
}
-
- if (useCached) {
- Some(logical)
- } else {
- // If the cached relation is not updated, we invalidate it right away.
+ case _ =>
+ logWarning(
+ s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " +
+ s"should be stored as $expectedFileFormat. However, we are getting " +
+ s"a ${relation.fileFormat} from the metastore cache. This cached " +
+ s"entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
- }
- case other =>
- logWarning(
- s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
- s"as Parquet. However, we are getting a $other from the metastore cache. " +
- s"This cached entry will be invalidated.")
- cachedDataSourceTables.invalidate(tableIdentifier)
- None
- }
+ }
+ case other =>
+ logWarning(
+ s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
+ s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " +
+ s"This cached entry will be invalidated.")
+ cachedDataSourceTables.invalidate(tableIdentifier)
+ None
}
+ }
+
+ private def convertToLogicalRelation(metastoreRelation: MetastoreRelation,
+ options: Map[String, String],
+ defaultSource: FileFormat,
+ fileFormatClass: Class[_ <: FileFormat],
+ fileType: String): LogicalRelation = {
+ val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
+ val tableIdentifier =
+ QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
+ val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
- // We're converting the entire table into ParquetRelation, so predicates to Hive metastore
+ // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore
// are empty.
val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
val location = p.getLocation
@@ -515,54 +530,65 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val cached = getCached(
tableIdentifier,
- metastoreRelation.table.storage.locationUri.toSeq,
+ metastoreRelation,
metastoreSchema,
+ fileFormatClass,
+ bucketSpec,
Some(partitionSpec))
- val parquetRelation = cached.getOrElse {
+ val hadoopFsRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
- val format = new DefaultSource()
- val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles())
- val mergedSchema = inferredSchema.map { inferred =>
- ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
- }.getOrElse(metastoreSchema)
+ val inferredSchema = if (fileType.equals("parquet")) {
+ val inferredSchema = defaultSource.inferSchema(hive, options, fileCatalog.allFiles())
+ inferredSchema.map { inferred =>
+ ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
+ }.getOrElse(metastoreSchema)
+ } else {
+ defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get
+ }
val relation = HadoopFsRelation(
sqlContext = hive,
location = fileCatalog,
partitionSchema = partitionSchema,
- dataSchema = mergedSchema,
- bucketSpec = None, // We don't support hive bucketed tables, only ones we write out.
- fileFormat = new DefaultSource(),
- options = parquetOptions)
+ dataSchema = inferredSchema,
+ bucketSpec = bucketSpec,
+ fileFormat = defaultSource,
+ options = options)
val created = LogicalRelation(relation)
cachedDataSourceTables.put(tableIdentifier, created)
created
}
- parquetRelation
+ hadoopFsRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
- val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
- val parquetRelation = cached.getOrElse {
+ val cached = getCached(tableIdentifier,
+ metastoreRelation,
+ metastoreSchema,
+ fileFormatClass,
+ bucketSpec,
+ None)
+ val logicalRelation = cached.getOrElse {
val created =
LogicalRelation(
DataSource(
sqlContext = hive,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
- options = parquetOptions,
- className = "parquet").resolveRelation())
+ bucketSpec = bucketSpec,
+ options = options,
+ className = fileType).resolveRelation())
cachedDataSourceTables.put(tableIdentifier, created)
created
}
- parquetRelation
+ logicalRelation
}
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}
@@ -572,6 +598,27 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
* data source relations for better performance.
*/
object ParquetConversions extends Rule[LogicalPlan] {
+ private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = {
+ relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") &&
+ hive.convertMetastoreParquet
+ }
+
+ private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = {
+ val defaultSource = new ParquetDefaultSource()
+ val fileFormatClass = classOf[ParquetDefaultSource]
+
+ val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
+ val options = Map(
+ ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
+ ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
+ relation.tableName,
+ Some(relation.databaseName)
+ ).unquotedString
+ )
+
+ convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet")
+ }
+
override def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.resolved || plan.analyzed) {
return plan
@@ -581,22 +628,17 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// Write path
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
- if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
- r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- val parquetRelation = convertToParquetRelation(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
+ if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
+ InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)
// Write path
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
- if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
- r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
- val parquetRelation = convertToParquetRelation(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
+ if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
+ InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)
// Read path
- case relation: MetastoreRelation if hive.convertMetastoreParquet &&
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
val parquetRelation = convertToParquetRelation(relation)
SubqueryAlias(relation.alias.getOrElse(relation.tableName), parquetRelation)
}
@@ -604,6 +646,50 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
/**
+ * When scanning Metastore ORC tables, convert them to ORC data source relations
+ * for better performance.
+ */
+ object OrcConversions extends Rule[LogicalPlan] {
+ private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = {
+ relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") &&
+ hive.convertMetastoreOrc
+ }
+
+ private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = {
+ val defaultSource = new OrcDefaultSource()
+ val fileFormatClass = classOf[OrcDefaultSource]
+ val options = Map[String, String]()
+
+ convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc")
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ if (!plan.resolved || plan.analyzed) {
+ return plan
+ }
+
+ plan transformUp {
+ // Write path
+ case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Orc data source (yet).
+ if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+ InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)
+
+ // Write path
+ case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Orc data source (yet).
+ if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+ InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)
+
+ // Read path
+ case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
+ val orcRelation = convertToOrcRelation(relation)
+ SubqueryAlias(relation.alias.getOrElse(relation.tableName), orcRelation)
+ }
+ }
+ }
+
+ /**
* Creates any tables required for query execution.
* For example, because of a CREATE TABLE X AS statement.
*/
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 1cd783e63a..dfbf22cc47 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -74,6 +74,7 @@ class HiveSessionCatalog(
private val metastoreCatalog = new HiveMetastoreCatalog(client, context)
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
+ val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 11ef0fd1bb..2bdb428e9d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -57,6 +57,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
+ catalog.OrcConversions ::
catalog.CreateTables ::
catalog.PreInsertionCasts ::
python.ExtractPythonUDFs ::