diff options
author | Michael Armbrust <michael@databricks.com> | 2014-08-18 13:17:10 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-08-18 13:17:10 -0700 |
commit | 3abd0c1cda09bb575adc99847a619bc84af37fd0 (patch) | |
tree | 7fbad5bd12995cbb047b77732f5212a05929cca1 /sql/hive | |
parent | 9eb74c7d2cbe127dd4c32bf1a8318497b2fb55b6 (diff) | |
download | spark-3abd0c1cda09bb575adc99847a619bc84af37fd0.tar.gz spark-3abd0c1cda09bb575adc99847a619bc84af37fd0.tar.bz2 spark-3abd0c1cda09bb575adc99847a619bc84af37fd0.zip |
[SPARK-2406][SQL] Initial support for using ParquetTableScan to read HiveMetaStore tables.
This PR adds an experimental flag `spark.sql.hive.convertMetastoreParquet` that when true causes the planner to detects tables that use Hive's Parquet SerDe and instead plans them using Spark SQL's native `ParquetTableScan`.
Author: Michael Armbrust <michael@databricks.com>
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes #1819 from marmbrus/parquetMetastore and squashes the following commits:
1620079 [Michael Armbrust] Revert "remove hive parquet bundle"
cc30430 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore
4f3d54f [Michael Armbrust] fix style
41ebc5f [Michael Armbrust] remove hive parquet bundle
a43e0da [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore
4c4dc19 [Michael Armbrust] Fix bug with tree splicing.
ebb267e [Michael Armbrust] include parquet hive to tests pass (Remove this later).
c0d9b72 [Michael Armbrust] Avoid creating a HadoopRDD per partition. Add dirty hacks to retrieve partition values from the InputSplit.
8cdc93c [Michael Armbrust] Merge pull request #8 from yhuai/parquetMetastore
a0baec7 [Yin Huai] Partitioning columns can be resolved.
1161338 [Michael Armbrust] Add a test to make sure conversion is actually happening
212d5cd [Michael Armbrust] Initial support for using ParquetTableScan to read HiveMetaStore tables.
Diffstat (limited to 'sql/hive')
4 files changed, 353 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index a8da676ffa..ff32c7c90a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -79,6 +79,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // Change the default SQL dialect to HiveQL override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql") + /** + * When true, enables an experimental feature where metastore tables that use the parquet SerDe + * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive + * SerDe. + */ + private[spark] def convertMetastoreParquet: Boolean = + getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true" + override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan } @@ -326,6 +334,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { TakeOrdered, ParquetOperations, InMemoryScans, + ParquetConversion, // Must be before HiveTableScans HiveTableScans, DataSinks, Scripts, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5fcc1bd4b9..389ace726d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,14 +17,20 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.SQLContext +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema} import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan} + +import scala.collection.JavaConversions._ private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. @@ -32,6 +38,115 @@ private[hive] trait HiveStrategies { val hiveContext: HiveContext + /** + * :: Experimental :: + * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet + * table scan operator. + * + * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring + * but since this is after the code freeze for 1.1 all logic is here to minimize disruption. + * + * Other issues: + * - Much of this logic assumes case insensitive resolution. + */ + @Experimental + object ParquetConversion extends Strategy { + implicit class LogicalPlanHacks(s: SchemaRDD) { + def lowerCase = + new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan)) + + def addPartitioningAttributes(attrs: Seq[Attribute]) = + new SchemaRDD( + s.sqlContext, + s.logicalPlan transform { + case p: ParquetRelation => p.copy(partitioningAttributes = attrs) + }) + } + + implicit class PhysicalPlanHacks(originalPlan: SparkPlan) { + def fakeOutput(newOutput: Seq[Attribute]) = + OutputFaker( + originalPlan.output.map(a => + newOutput.find(a.name.toLowerCase == _.name.toLowerCase) + .getOrElse( + sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))), + originalPlan) + } + + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) + if relation.tableDesc.getSerdeClassName.contains("Parquet") && + hiveContext.convertMetastoreParquet => + + // Filter out all predicates that only deal with partition keys + val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet + val (pruningPredicates, otherPredicates) = predicates.partition { + _.references.map(_.exprId).subsetOf(partitionKeyIds) + } + + // We are going to throw the predicates and projection back at the whole optimization + // sequence so lets unresolve all the attributes, allowing them to be rebound to the + // matching parquet attributes. + val unresolvedOtherPredicates = otherPredicates.map(_ transform { + case a: AttributeReference => UnresolvedAttribute(a.name) + }).reduceOption(And).getOrElse(Literal(true)) + + val unresolvedProjection = projectList.map(_ transform { + case a: AttributeReference => UnresolvedAttribute(a.name) + }) + + if (relation.hiveQlTable.isPartitioned) { + val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true)) + // Translate the predicate so that it automatically casts the input values to the correct + // data types during evaluation + val castedPredicate = rawPredicate transform { + case a: AttributeReference => + val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId) + val key = relation.partitionKeys(idx) + Cast(BoundReference(idx, StringType, nullable = true), key.dataType) + } + + val inputData = new GenericMutableRow(relation.partitionKeys.size) + val pruningCondition = + if(codegenEnabled) { + GeneratePredicate(castedPredicate) + } else { + InterpretedPredicate(castedPredicate) + } + + val partitions = relation.hiveQlPartitions.filter { part => + val partitionValues = part.getValues + var i = 0 + while (i < partitionValues.size()) { + inputData(i) = partitionValues(i) + i += 1 + } + pruningCondition(inputData) + } + + hiveContext + .parquetFile(partitions.map(_.getLocation).mkString(",")) + .addPartitioningAttributes(relation.partitionKeys) + .lowerCase + .where(unresolvedOtherPredicates) + .select(unresolvedProjection:_*) + .queryExecution + .executedPlan + .fakeOutput(projectList.map(_.toAttribute)):: Nil + } else { + hiveContext + .parquetFile(relation.hiveQlTable.getDataLocation.getPath) + .lowerCase + .where(unresolvedOtherPredicates) + .select(unresolvedProjection:_*) + .queryExecution + .executedPlan + .fakeOutput(projectList.map(_.toAttribute)) :: Nil + } + case _ => Nil + } + } + object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala new file mode 100644 index 0000000000..544abfc324 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.parquet + +import java.util.Properties + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category +import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector +import org.apache.hadoop.io.Writable + +/** + * A placeholder that allows SparkSQL users to create metastore tables that are stored as + * parquet files. It is only intended to pass the checks that the serde is valid and exists + * when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan + * when "spark.sql.hive.convertMetastoreParquet" is set to true. + */ +@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " + + "placeholder in the Hive MetaStore") +class FakeParquetSerDe extends SerDe { + override def getObjectInspector: ObjectInspector = new ObjectInspector { + override def getCategory: Category = Category.PRIMITIVE + + override def getTypeName: String = "string" + } + + override def deserialize(p1: Writable): AnyRef = throwError + + override def initialize(p1: Configuration, p2: Properties): Unit = {} + + override def getSerializedClass: Class[_ <: Writable] = throwError + + override def getSerDeStats: SerDeStats = throwError + + override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError + + private def throwError = + sys.error( + "spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe") +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala new file mode 100644 index 0000000000..0723be7298 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala @@ -0,0 +1,171 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.io.File + +import org.apache.spark.sql.hive.execution.HiveTableScan +import org.scalatest.BeforeAndAfterAll + +import scala.reflect.ClassTag + +import org.apache.spark.sql.{SQLConf, QueryTest} +import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin} +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ + +case class ParquetData(intField: Int, stringField: String) + +/** + * Tests for our SerDe -> Native parquet scan conversion. + */ +class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + setConf("spark.sql.hive.convertMetastoreParquet", "true") + } + + override def afterAll(): Unit = { + setConf("spark.sql.hive.convertMetastoreParquet", "false") + } + + val partitionedTableDir = File.createTempFile("parquettests", "sparksql") + partitionedTableDir.delete() + partitionedTableDir.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDir, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-$p")) + .saveAsParquetFile(partDir.getCanonicalPath) + } + + sql(s""" + create external table partitioned_parquet + ( + intField INT, + stringField STRING + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDir.getCanonicalPath}' + """) + + sql(s""" + create external table normal_parquet + ( + intField INT, + stringField STRING + ) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${new File(partitionedTableDir, "p=1").getCanonicalPath}' + """) + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)") + } + + test("project the partitioning column") { + checkAnswer( + sql("SELECT p, count(*) FROM partitioned_parquet group by p"), + (1, 10) :: + (2, 10) :: + (3, 10) :: + (4, 10) :: + (5, 10) :: + (6, 10) :: + (7, 10) :: + (8, 10) :: + (9, 10) :: + (10, 10) :: Nil + ) + } + + test("project partitioning and non-partitioning columns") { + checkAnswer( + sql("SELECT stringField, p, count(intField) " + + "FROM partitioned_parquet GROUP BY p, stringField"), + ("part-1", 1, 10) :: + ("part-2", 2, 10) :: + ("part-3", 3, 10) :: + ("part-4", 4, 10) :: + ("part-5", 5, 10) :: + ("part-6", 6, 10) :: + ("part-7", 7, 10) :: + ("part-8", 8, 10) :: + ("part-9", 9, 10) :: + ("part-10", 10, 10) :: Nil + ) + } + + test("simple count") { + checkAnswer( + sql("SELECT COUNT(*) FROM partitioned_parquet"), + 100) + } + + test("pruned count") { + checkAnswer( + sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"), + 10) + } + + test("multi-partition pruned count") { + checkAnswer( + sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"), + 30) + } + + test("non-partition predicates") { + checkAnswer( + sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"), + 30) + } + + test("sum") { + checkAnswer( + sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"), + 1 + 2 + 3 + ) + } + + test("non-part select(*)") { + checkAnswer( + sql("SELECT COUNT(*) FROM normal_parquet"), + 10 + ) + } + + test("conversion is working") { + assert( + sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + case _: HiveTableScan => true + }.isEmpty) + assert( + sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + case _: ParquetTableScan => true + }.nonEmpty) + } +} |