diff options
author | Tejas Patil <tejasp@fb.com> | 2016-04-01 13:13:16 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-04-01 13:13:16 -0700 |
commit | 1e886159849e3918445d3fdc3c4cef86c6c1a236 (patch) | |
tree | 57b47e41c4944b57c16f93ce7b71d59f84e6733f /sql/hive/compatibility | |
parent | a884daad805a701494e87393dc307937472a985d (diff) | |
download | spark-1e886159849e3918445d3fdc3c4cef86c6c1a236.tar.gz spark-1e886159849e3918445d3fdc3c4cef86c6c1a236.tar.bz2 spark-1e886159849e3918445d3fdc3c4cef86c6c1a236.zip |
[SPARK-14070][SQL] Use ORC data source for SQL queries on ORC tables
## What changes were proposed in this pull request?
This patch enables use of OrcRelation for SQL queries which read data from Hive tables. Changes in this patch:
- Added a new rule `OrcConversions` which would alter the plan to use `OrcRelation`. In this diff, the conversion is done only for reads.
- Added a new config `spark.sql.hive.convertMetastoreOrc` to control the conversion
BEFORE
```
scala> hqlContext.sql("SELECT * FROM orc_table").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias(*, None)]
+- 'UnresolvedRelation `orc_table`, None
== Analyzed Logical Plan ==
key: string, value: string
Project [key#171,value#172]
+- MetastoreRelation default, orc_table, None
== Optimized Logical Plan ==
MetastoreRelation default, orc_table, None
== Physical Plan ==
HiveTableScan [key#171,value#172], MetastoreRelation default, orc_table, None
```
AFTER
```
scala> hqlContext.sql("SELECT * FROM orc_table").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias(*, None)]
+- 'UnresolvedRelation `orc_table`, None
== Analyzed Logical Plan ==
key: string, value: string
Project [key#76,value#77]
+- SubqueryAlias orc_table
+- Relation[key#76,value#77] ORC part: struct<>, data: struct<key:string,value:string>
== Optimized Logical Plan ==
Relation[key#76,value#77] ORC part: struct<>, data: struct<key:string,value:string>
== Physical Plan ==
WholeStageCodegen
: +- Scan ORC part: struct<>, data: struct<key:string,value:string>[key#76,value#77] InputPaths: file:/user/hive/warehouse/orc_table
```
## How was this patch tested?
- Added a new unit test. Ran existing unit tests
- Ran with production like data
## Performance gains
Ran on a production table in Facebook (note that the data was in DWRF file format which is similar to ORC)
Best case : when there was no matching rows for the predicate in the query (everything is filtered out)
```
CPU time Wall time Total wall time across all tasks
================================================================
Without the change 541_515 sec 25.0 mins 165.8 hours
With change 407 sec 1.5 mins 15 mins
```
Average case: A subset of rows in the data match the query predicate
```
CPU time Wall time Total wall time across all tasks
================================================================
Without the change 624_630 sec 31.0 mins 199.0 h
With change 14_769 sec 5.3 mins 7.7 h
```
Author: Tejas Patil <tejasp@fb.com>
Closes #11891 from tejasapatil/orc_ppd.
Diffstat (limited to 'sql/hive/compatibility')
-rw-r--r-- | sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 695b5ef733..d8695bc5db 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.util.{Locale, TimeZone} -import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.scalatest.BeforeAndAfter +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.internal.SQLConf @@ -38,6 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalLocale = Locale.getDefault private val originalColumnBatchSize = TestHive.conf.columnBatchSize private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning + private val originalConvertMetastoreOrc = TestHive.convertMetastoreOrc def testCases: Seq[(String, File)] = { hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) @@ -56,6 +58,9 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Use Hive hash expression instead of the native one TestHive.sessionState.functionRegistry.unregisterFunction("hash") + // Ensures that the plans generation use metastore relation and not OrcRelation + // Was done because SqlBuilder does not work with plans having logical relation + TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, false) RuleExecutor.resetTime() } @@ -66,6 +71,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { Locale.setDefault(originalLocale) TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) + TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc) TestHive.sessionState.functionRegistry.restore() // For debugging dump some statistics about how much time was spent in various optimizer rules. |