aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-18 13:17:10 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-18 13:17:10 -0700
commit3abd0c1cda09bb575adc99847a619bc84af37fd0 (patch)
tree7fbad5bd12995cbb047b77732f5212a05929cca1 /sql
parent9eb74c7d2cbe127dd4c32bf1a8318497b2fb55b6 (diff)
downloadspark-3abd0c1cda09bb575adc99847a619bc84af37fd0.tar.gz
spark-3abd0c1cda09bb575adc99847a619bc84af37fd0.tar.bz2
spark-3abd0c1cda09bb575adc99847a619bc84af37fd0.zip
[SPARK-2406][SQL] Initial support for using ParquetTableScan to read HiveMetaStore tables.
This PR adds an experimental flag `spark.sql.hive.convertMetastoreParquet` that when true causes the planner to detects tables that use Hive's Parquet SerDe and instead plans them using Spark SQL's native `ParquetTableScan`. Author: Michael Armbrust <michael@databricks.com> Author: Yin Huai <huai@cse.ohio-state.edu> Closes #1819 from marmbrus/parquetMetastore and squashes the following commits: 1620079 [Michael Armbrust] Revert "remove hive parquet bundle" cc30430 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore 4f3d54f [Michael Armbrust] fix style 41ebc5f [Michael Armbrust] remove hive parquet bundle a43e0da [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore 4c4dc19 [Michael Armbrust] Fix bug with tree splicing. ebb267e [Michael Armbrust] include parquet hive to tests pass (Remove this later). c0d9b72 [Michael Armbrust] Avoid creating a HadoopRDD per partition. Add dirty hacks to retrieve partition values from the InputSplit. 8cdc93c [Michael Armbrust] Merge pull request #8 from yhuai/parquetMetastore a0baec7 [Yin Huai] Partitioning columns can be resolved. 1161338 [Michael Armbrust] Add a test to make sure conversion is actually happening 212d5cd [Michael Armbrust] Initial support for using ParquetTableScan to read HiveMetaStore tables.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala74
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala119
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala56
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala171
7 files changed, 427 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 0027f3cf1f..f9dfa3c92f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}
+
+/**
+ * :: DeveloperApi ::
+ * A plan node that does nothing but lie about the output of its child. Used to spice a
+ * (hopefully structurally equivalent) tree from a different optimization sequence into an already
+ * resolved tree.
+ */
+@DeveloperApi
+case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
+ def children = child :: Nil
+ def execute() = child.execute()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 053b2a1543..1713ae6fb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -47,7 +47,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
private[sql] case class ParquetRelation(
path: String,
@transient conf: Option[Configuration],
- @transient sqlContext: SQLContext)
+ @transient sqlContext: SQLContext,
+ partitioningAttributes: Seq[Attribute] = Nil)
extends LeafNode with MultiInstanceRelation {
self: Product =>
@@ -61,12 +62,13 @@ private[sql] case class ParquetRelation(
/** Attributes */
override val output =
+ partitioningAttributes ++
ParquetTypesConverter.readSchemaFromFile(
- new Path(path),
+ new Path(path.split(",").head),
conf,
sqlContext.isParquetBinaryAsString)
- override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
+ override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
// Equals must also take into account the output attributes so that we can distinguish between
// different instances of the same relation,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index c6dca10f6a..f6cfab736d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+
import parquet.hadoop._
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
@@ -42,6 +43,7 @@ import parquet.io.ParquetDecodingException
import parquet.schema.MessageType
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
@@ -60,11 +62,18 @@ case class ParquetTableScan(
// The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
// by exprId. note: output cannot be transient, see
// https://issues.apache.org/jira/browse/SPARK-1367
- val output = attributes.map { a =>
- relation.output
- .find(o => o.exprId == a.exprId)
- .getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}"))
- }
+ val normalOutput =
+ attributes
+ .filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
+ .flatMap(a => relation.output.find(o => o.exprId == a.exprId))
+
+ val partOutput =
+ attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))
+
+ def output = partOutput ++ normalOutput
+
+ assert(normalOutput.size + partOutput.size == attributes.size,
+ s"$normalOutput + $partOutput != $attributes, ${relation.output}")
override def execute(): RDD[Row] = {
val sc = sqlContext.sparkContext
@@ -72,16 +81,19 @@ case class ParquetTableScan(
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
val conf: Configuration = ContextUtil.getConfiguration(job)
- val qualifiedPath = {
- val path = new Path(relation.path)
- path.getFileSystem(conf).makeQualified(path)
+
+ relation.path.split(",").foreach { curPath =>
+ val qualifiedPath = {
+ val path = new Path(curPath)
+ path.getFileSystem(conf).makeQualified(path)
+ }
+ NewFileInputFormat.addInputPath(job, qualifiedPath)
}
- NewFileInputFormat.addInputPath(job, qualifiedPath)
// Store both requested and original schema in `Configuration`
conf.set(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
- ParquetTypesConverter.convertToString(output))
+ ParquetTypesConverter.convertToString(normalOutput))
conf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
ParquetTypesConverter.convertToString(relation.output))
@@ -102,13 +114,41 @@ case class ParquetTableScan(
SQLConf.PARQUET_CACHE_METADATA,
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
- sc.newAPIHadoopRDD(
- conf,
- classOf[FilteringParquetRowInputFormat],
- classOf[Void],
- classOf[Row])
- .map(_._2)
- .filter(_ != null) // Parquet's record filters may produce null values
+ val baseRDD =
+ new org.apache.spark.rdd.NewHadoopRDD(
+ sc,
+ classOf[FilteringParquetRowInputFormat],
+ classOf[Void],
+ classOf[Row],
+ conf)
+
+ if (partOutput.nonEmpty) {
+ baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
+ val partValue = "([^=]+)=([^=]+)".r
+ val partValues =
+ split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
+ .getPath
+ .toString
+ .split("/")
+ .flatMap {
+ case partValue(key, value) => Some(key -> value)
+ case _ => None
+ }.toMap
+
+ val partitionRowValues =
+ partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
+
+ new Iterator[Row] {
+ private[this] val joinedRow = new JoinedRow(Row(partitionRowValues:_*), null)
+
+ def hasNext = iter.hasNext
+
+ def next() = joinedRow.withRight(iter.next()._2)
+ }
+ }
+ } else {
+ baseRDD.map(_._2)
+ }.filter(_ != null) // Parquet's record filters may produce null values
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index a8da676ffa..ff32c7c90a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -79,6 +79,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// Change the default SQL dialect to HiveQL
override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+ /**
+ * When true, enables an experimental feature where metastore tables that use the parquet SerDe
+ * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
+ * SerDe.
+ */
+ private[spark] def convertMetastoreParquet: Boolean =
+ getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"
+
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
@@ -326,6 +334,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
TakeOrdered,
ParquetOperations,
InMemoryScans,
+ ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
DataSinks,
Scripts,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5fcc1bd4b9..389ace726d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,14 +17,20 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
+
+import scala.collection.JavaConversions._
private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
@@ -32,6 +38,115 @@ private[hive] trait HiveStrategies {
val hiveContext: HiveContext
+ /**
+ * :: Experimental ::
+ * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
+ * table scan operator.
+ *
+ * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
+ * but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
+ *
+ * Other issues:
+ * - Much of this logic assumes case insensitive resolution.
+ */
+ @Experimental
+ object ParquetConversion extends Strategy {
+ implicit class LogicalPlanHacks(s: SchemaRDD) {
+ def lowerCase =
+ new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
+
+ def addPartitioningAttributes(attrs: Seq[Attribute]) =
+ new SchemaRDD(
+ s.sqlContext,
+ s.logicalPlan transform {
+ case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
+ })
+ }
+
+ implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
+ def fakeOutput(newOutput: Seq[Attribute]) =
+ OutputFaker(
+ originalPlan.output.map(a =>
+ newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
+ .getOrElse(
+ sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
+ originalPlan)
+ }
+
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
+ if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
+ hiveContext.convertMetastoreParquet =>
+
+ // Filter out all predicates that only deal with partition keys
+ val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
+ val (pruningPredicates, otherPredicates) = predicates.partition {
+ _.references.map(_.exprId).subsetOf(partitionKeyIds)
+ }
+
+ // We are going to throw the predicates and projection back at the whole optimization
+ // sequence so lets unresolve all the attributes, allowing them to be rebound to the
+ // matching parquet attributes.
+ val unresolvedOtherPredicates = otherPredicates.map(_ transform {
+ case a: AttributeReference => UnresolvedAttribute(a.name)
+ }).reduceOption(And).getOrElse(Literal(true))
+
+ val unresolvedProjection = projectList.map(_ transform {
+ case a: AttributeReference => UnresolvedAttribute(a.name)
+ })
+
+ if (relation.hiveQlTable.isPartitioned) {
+ val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
+ // Translate the predicate so that it automatically casts the input values to the correct
+ // data types during evaluation
+ val castedPredicate = rawPredicate transform {
+ case a: AttributeReference =>
+ val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
+ val key = relation.partitionKeys(idx)
+ Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
+ }
+
+ val inputData = new GenericMutableRow(relation.partitionKeys.size)
+ val pruningCondition =
+ if(codegenEnabled) {
+ GeneratePredicate(castedPredicate)
+ } else {
+ InterpretedPredicate(castedPredicate)
+ }
+
+ val partitions = relation.hiveQlPartitions.filter { part =>
+ val partitionValues = part.getValues
+ var i = 0
+ while (i < partitionValues.size()) {
+ inputData(i) = partitionValues(i)
+ i += 1
+ }
+ pruningCondition(inputData)
+ }
+
+ hiveContext
+ .parquetFile(partitions.map(_.getLocation).mkString(","))
+ .addPartitioningAttributes(relation.partitionKeys)
+ .lowerCase
+ .where(unresolvedOtherPredicates)
+ .select(unresolvedProjection:_*)
+ .queryExecution
+ .executedPlan
+ .fakeOutput(projectList.map(_.toAttribute)):: Nil
+ } else {
+ hiveContext
+ .parquetFile(relation.hiveQlTable.getDataLocation.getPath)
+ .lowerCase
+ .where(unresolvedOtherPredicates)
+ .select(unresolvedProjection:_*)
+ .queryExecution
+ .executedPlan
+ .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ }
+ case _ => Nil
+ }
+ }
+
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala
new file mode 100644
index 0000000000..544abfc324
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.parquet
+
+import java.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
+import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
+import org.apache.hadoop.io.Writable
+
+/**
+ * A placeholder that allows SparkSQL users to create metastore tables that are stored as
+ * parquet files. It is only intended to pass the checks that the serde is valid and exists
+ * when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan
+ * when "spark.sql.hive.convertMetastoreParquet" is set to true.
+ */
+@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " +
+ "placeholder in the Hive MetaStore")
+class FakeParquetSerDe extends SerDe {
+ override def getObjectInspector: ObjectInspector = new ObjectInspector {
+ override def getCategory: Category = Category.PRIMITIVE
+
+ override def getTypeName: String = "string"
+ }
+
+ override def deserialize(p1: Writable): AnyRef = throwError
+
+ override def initialize(p1: Configuration, p2: Properties): Unit = {}
+
+ override def getSerializedClass: Class[_ <: Writable] = throwError
+
+ override def getSerDeStats: SerDeStats = throwError
+
+ override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError
+
+ private def throwError =
+ sys.error(
+ "spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe")
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
new file mode 100644
index 0000000000..0723be7298
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
@@ -0,0 +1,171 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.scalatest.BeforeAndAfterAll
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.{SQLConf, QueryTest}
+import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+
+case class ParquetData(intField: Int, stringField: String)
+
+/**
+ * Tests for our SerDe -> Native parquet scan conversion.
+ */
+class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ setConf("spark.sql.hive.convertMetastoreParquet", "true")
+ }
+
+ override def afterAll(): Unit = {
+ setConf("spark.sql.hive.convertMetastoreParquet", "false")
+ }
+
+ val partitionedTableDir = File.createTempFile("parquettests", "sparksql")
+ partitionedTableDir.delete()
+ partitionedTableDir.mkdir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDir, s"p=$p")
+ sparkContext.makeRDD(1 to 10)
+ .map(i => ParquetData(i, s"part-$p"))
+ .saveAsParquetFile(partDir.getCanonicalPath)
+ }
+
+ sql(s"""
+ create external table partitioned_parquet
+ (
+ intField INT,
+ stringField STRING
+ )
+ PARTITIONED BY (p int)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${partitionedTableDir.getCanonicalPath}'
+ """)
+
+ sql(s"""
+ create external table normal_parquet
+ (
+ intField INT,
+ stringField STRING
+ )
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+ """)
+
+ (1 to 10).foreach { p =>
+ sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
+ }
+
+ test("project the partitioning column") {
+ checkAnswer(
+ sql("SELECT p, count(*) FROM partitioned_parquet group by p"),
+ (1, 10) ::
+ (2, 10) ::
+ (3, 10) ::
+ (4, 10) ::
+ (5, 10) ::
+ (6, 10) ::
+ (7, 10) ::
+ (8, 10) ::
+ (9, 10) ::
+ (10, 10) :: Nil
+ )
+ }
+
+ test("project partitioning and non-partitioning columns") {
+ checkAnswer(
+ sql("SELECT stringField, p, count(intField) " +
+ "FROM partitioned_parquet GROUP BY p, stringField"),
+ ("part-1", 1, 10) ::
+ ("part-2", 2, 10) ::
+ ("part-3", 3, 10) ::
+ ("part-4", 4, 10) ::
+ ("part-5", 5, 10) ::
+ ("part-6", 6, 10) ::
+ ("part-7", 7, 10) ::
+ ("part-8", 8, 10) ::
+ ("part-9", 9, 10) ::
+ ("part-10", 10, 10) :: Nil
+ )
+ }
+
+ test("simple count") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM partitioned_parquet"),
+ 100)
+ }
+
+ test("pruned count") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
+ 10)
+ }
+
+ test("multi-partition pruned count") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
+ 30)
+ }
+
+ test("non-partition predicates") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
+ 30)
+ }
+
+ test("sum") {
+ checkAnswer(
+ sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
+ 1 + 2 + 3
+ )
+ }
+
+ test("non-part select(*)") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM normal_parquet"),
+ 10
+ )
+ }
+
+ test("conversion is working") {
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ case _: HiveTableScan => true
+ }.isEmpty)
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ case _: ParquetTableScan => true
+ }.nonEmpty)
+ }
+}