aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-18 13:17:10 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-18 13:17:10 -0700
commit3abd0c1cda09bb575adc99847a619bc84af37fd0 (patch)
tree7fbad5bd12995cbb047b77732f5212a05929cca1 /sql/hive
parent9eb74c7d2cbe127dd4c32bf1a8318497b2fb55b6 (diff)
downloadspark-3abd0c1cda09bb575adc99847a619bc84af37fd0.tar.gz
spark-3abd0c1cda09bb575adc99847a619bc84af37fd0.tar.bz2
spark-3abd0c1cda09bb575adc99847a619bc84af37fd0.zip
[SPARK-2406][SQL] Initial support for using ParquetTableScan to read HiveMetaStore tables.
This PR adds an experimental flag `spark.sql.hive.convertMetastoreParquet` that when true causes the planner to detects tables that use Hive's Parquet SerDe and instead plans them using Spark SQL's native `ParquetTableScan`. Author: Michael Armbrust <michael@databricks.com> Author: Yin Huai <huai@cse.ohio-state.edu> Closes #1819 from marmbrus/parquetMetastore and squashes the following commits: 1620079 [Michael Armbrust] Revert "remove hive parquet bundle" cc30430 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore 4f3d54f [Michael Armbrust] fix style 41ebc5f [Michael Armbrust] remove hive parquet bundle a43e0da [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore 4c4dc19 [Michael Armbrust] Fix bug with tree splicing. ebb267e [Michael Armbrust] include parquet hive to tests pass (Remove this later). c0d9b72 [Michael Armbrust] Avoid creating a HadoopRDD per partition. Add dirty hacks to retrieve partition values from the InputSplit. 8cdc93c [Michael Armbrust] Merge pull request #8 from yhuai/parquetMetastore a0baec7 [Yin Huai] Partitioning columns can be resolved. 1161338 [Michael Armbrust] Add a test to make sure conversion is actually happening 212d5cd [Michael Armbrust] Initial support for using ParquetTableScan to read HiveMetaStore tables.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala119
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala56
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala171
4 files changed, 353 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index a8da676ffa..ff32c7c90a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -79,6 +79,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// Change the default SQL dialect to HiveQL
override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+ /**
+ * When true, enables an experimental feature where metastore tables that use the parquet SerDe
+ * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
+ * SerDe.
+ */
+ private[spark] def convertMetastoreParquet: Boolean =
+ getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"
+
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
@@ -326,6 +334,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
TakeOrdered,
ParquetOperations,
InMemoryScans,
+ ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
DataSinks,
Scripts,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5fcc1bd4b9..389ace726d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,14 +17,20 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
+
+import scala.collection.JavaConversions._
private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
@@ -32,6 +38,115 @@ private[hive] trait HiveStrategies {
val hiveContext: HiveContext
+ /**
+ * :: Experimental ::
+ * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
+ * table scan operator.
+ *
+ * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
+ * but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
+ *
+ * Other issues:
+ * - Much of this logic assumes case insensitive resolution.
+ */
+ @Experimental
+ object ParquetConversion extends Strategy {
+ implicit class LogicalPlanHacks(s: SchemaRDD) {
+ def lowerCase =
+ new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
+
+ def addPartitioningAttributes(attrs: Seq[Attribute]) =
+ new SchemaRDD(
+ s.sqlContext,
+ s.logicalPlan transform {
+ case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
+ })
+ }
+
+ implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
+ def fakeOutput(newOutput: Seq[Attribute]) =
+ OutputFaker(
+ originalPlan.output.map(a =>
+ newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
+ .getOrElse(
+ sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
+ originalPlan)
+ }
+
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
+ if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
+ hiveContext.convertMetastoreParquet =>
+
+ // Filter out all predicates that only deal with partition keys
+ val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
+ val (pruningPredicates, otherPredicates) = predicates.partition {
+ _.references.map(_.exprId).subsetOf(partitionKeyIds)
+ }
+
+ // We are going to throw the predicates and projection back at the whole optimization
+ // sequence so lets unresolve all the attributes, allowing them to be rebound to the
+ // matching parquet attributes.
+ val unresolvedOtherPredicates = otherPredicates.map(_ transform {
+ case a: AttributeReference => UnresolvedAttribute(a.name)
+ }).reduceOption(And).getOrElse(Literal(true))
+
+ val unresolvedProjection = projectList.map(_ transform {
+ case a: AttributeReference => UnresolvedAttribute(a.name)
+ })
+
+ if (relation.hiveQlTable.isPartitioned) {
+ val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
+ // Translate the predicate so that it automatically casts the input values to the correct
+ // data types during evaluation
+ val castedPredicate = rawPredicate transform {
+ case a: AttributeReference =>
+ val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
+ val key = relation.partitionKeys(idx)
+ Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
+ }
+
+ val inputData = new GenericMutableRow(relation.partitionKeys.size)
+ val pruningCondition =
+ if(codegenEnabled) {
+ GeneratePredicate(castedPredicate)
+ } else {
+ InterpretedPredicate(castedPredicate)
+ }
+
+ val partitions = relation.hiveQlPartitions.filter { part =>
+ val partitionValues = part.getValues
+ var i = 0
+ while (i < partitionValues.size()) {
+ inputData(i) = partitionValues(i)
+ i += 1
+ }
+ pruningCondition(inputData)
+ }
+
+ hiveContext
+ .parquetFile(partitions.map(_.getLocation).mkString(","))
+ .addPartitioningAttributes(relation.partitionKeys)
+ .lowerCase
+ .where(unresolvedOtherPredicates)
+ .select(unresolvedProjection:_*)
+ .queryExecution
+ .executedPlan
+ .fakeOutput(projectList.map(_.toAttribute)):: Nil
+ } else {
+ hiveContext
+ .parquetFile(relation.hiveQlTable.getDataLocation.getPath)
+ .lowerCase
+ .where(unresolvedOtherPredicates)
+ .select(unresolvedProjection:_*)
+ .queryExecution
+ .executedPlan
+ .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ }
+ case _ => Nil
+ }
+ }
+
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala
new file mode 100644
index 0000000000..544abfc324
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.parquet
+
+import java.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
+import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
+import org.apache.hadoop.io.Writable
+
+/**
+ * A placeholder that allows SparkSQL users to create metastore tables that are stored as
+ * parquet files. It is only intended to pass the checks that the serde is valid and exists
+ * when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan
+ * when "spark.sql.hive.convertMetastoreParquet" is set to true.
+ */
+@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " +
+ "placeholder in the Hive MetaStore")
+class FakeParquetSerDe extends SerDe {
+ override def getObjectInspector: ObjectInspector = new ObjectInspector {
+ override def getCategory: Category = Category.PRIMITIVE
+
+ override def getTypeName: String = "string"
+ }
+
+ override def deserialize(p1: Writable): AnyRef = throwError
+
+ override def initialize(p1: Configuration, p2: Properties): Unit = {}
+
+ override def getSerializedClass: Class[_ <: Writable] = throwError
+
+ override def getSerDeStats: SerDeStats = throwError
+
+ override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError
+
+ private def throwError =
+ sys.error(
+ "spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe")
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
new file mode 100644
index 0000000000..0723be7298
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
@@ -0,0 +1,171 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.scalatest.BeforeAndAfterAll
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.{SQLConf, QueryTest}
+import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+
+case class ParquetData(intField: Int, stringField: String)
+
+/**
+ * Tests for our SerDe -> Native parquet scan conversion.
+ */
+class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ setConf("spark.sql.hive.convertMetastoreParquet", "true")
+ }
+
+ override def afterAll(): Unit = {
+ setConf("spark.sql.hive.convertMetastoreParquet", "false")
+ }
+
+ val partitionedTableDir = File.createTempFile("parquettests", "sparksql")
+ partitionedTableDir.delete()
+ partitionedTableDir.mkdir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDir, s"p=$p")
+ sparkContext.makeRDD(1 to 10)
+ .map(i => ParquetData(i, s"part-$p"))
+ .saveAsParquetFile(partDir.getCanonicalPath)
+ }
+
+ sql(s"""
+ create external table partitioned_parquet
+ (
+ intField INT,
+ stringField STRING
+ )
+ PARTITIONED BY (p int)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${partitionedTableDir.getCanonicalPath}'
+ """)
+
+ sql(s"""
+ create external table normal_parquet
+ (
+ intField INT,
+ stringField STRING
+ )
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+ """)
+
+ (1 to 10).foreach { p =>
+ sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
+ }
+
+ test("project the partitioning column") {
+ checkAnswer(
+ sql("SELECT p, count(*) FROM partitioned_parquet group by p"),
+ (1, 10) ::
+ (2, 10) ::
+ (3, 10) ::
+ (4, 10) ::
+ (5, 10) ::
+ (6, 10) ::
+ (7, 10) ::
+ (8, 10) ::
+ (9, 10) ::
+ (10, 10) :: Nil
+ )
+ }
+
+ test("project partitioning and non-partitioning columns") {
+ checkAnswer(
+ sql("SELECT stringField, p, count(intField) " +
+ "FROM partitioned_parquet GROUP BY p, stringField"),
+ ("part-1", 1, 10) ::
+ ("part-2", 2, 10) ::
+ ("part-3", 3, 10) ::
+ ("part-4", 4, 10) ::
+ ("part-5", 5, 10) ::
+ ("part-6", 6, 10) ::
+ ("part-7", 7, 10) ::
+ ("part-8", 8, 10) ::
+ ("part-9", 9, 10) ::
+ ("part-10", 10, 10) :: Nil
+ )
+ }
+
+ test("simple count") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM partitioned_parquet"),
+ 100)
+ }
+
+ test("pruned count") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
+ 10)
+ }
+
+ test("multi-partition pruned count") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
+ 30)
+ }
+
+ test("non-partition predicates") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
+ 30)
+ }
+
+ test("sum") {
+ checkAnswer(
+ sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
+ 1 + 2 + 3
+ )
+ }
+
+ test("non-part select(*)") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM normal_parquet"),
+ 10
+ )
+ }
+
+ test("conversion is working") {
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ case _: HiveTableScan => true
+ }.isEmpty)
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ case _: ParquetTableScan => true
+ }.nonEmpty)
+ }
+}