diff options
author | Michael Armbrust <michael@databricks.com> | 2014-11-20 18:31:02 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-11-20 18:31:31 -0800 |
commit | 64b30be7e4cb86059bbfeb3e2f8f47f41d015862 (patch) | |
tree | d008066bbf0d0214fa8fbb3f3a7e914445ff6ff4 /sql | |
parent | 0f6a2eeaf20363061f9ed2d9062f3a7022c2c8ba (diff) | |
download | spark-64b30be7e4cb86059bbfeb3e2f8f47f41d015862.tar.gz spark-64b30be7e4cb86059bbfeb3e2f8f47f41d015862.tar.bz2 spark-64b30be7e4cb86059bbfeb3e2f8f47f41d015862.zip |
[SPARK-4413][SQL] Parquet support through datasource API
Goals:
- Support for accessing parquet using SQL but not requiring Hive (thus allowing support of parquet tables with decimal columns)
- Support for folder based partitioning with automatic discovery of available partitions
- Caching of file metadata
See scaladoc of `ParquetRelation2` for more details.
Author: Michael Armbrust <michael@databricks.com>
Closes #3269 from marmbrus/newParquet and squashes the following commits:
1dd75f1 [Michael Armbrust] Pass all paths for FileInputFormat at once.
645768b [Michael Armbrust] Review comments.
abd8e2f [Michael Armbrust] Alternative implementation of parquet based on the datasources API.
938019e [Michael Armbrust] Add an experimental interface to data sources that exposes catalyst expressions.
e9d2641 [Michael Armbrust] logging / formatting improvements.
(cherry picked from commit 02ec058efe24348cdd3691b55942e6f0ef138732)
Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala | 4 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala | 290 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala | 43 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala | 22 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala) | 178 |
5 files changed, 458 insertions, 79 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 5d0643a64a..0e36852ddd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -361,7 +361,7 @@ private[parquet] class FilteringParquetRowInputFormat private var footers: JList[Footer] = _ - private var fileStatuses= Map.empty[Path, FileStatus] + private var fileStatuses = Map.empty[Path, FileStatus] override def createRecordReader( inputSplit: InputSplit, @@ -405,7 +405,9 @@ private[parquet] class FilteringParquetRowInputFormat } val newFooters = new mutable.HashMap[FileStatus, Footer] if (toFetch.size > 0) { + val startFetch = System.currentTimeMillis val fetched = getFooters(conf, toFetch) + logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch} ms") for ((status, i) <- toFetch.zipWithIndex) { newFooters(status) = fetched.get(i) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala new file mode 100644 index 0000000000..bea12e6dd6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.parquet + +import java.util.{List => JList} + +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job} + +import parquet.hadoop.ParquetInputFormat +import parquet.hadoop.util.ContextUtil + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.{Partition => SparkPartition, Logging} +import org.apache.spark.rdd.{NewHadoopPartition, RDD} + +import org.apache.spark.sql.{SQLConf, Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, And, Expression, Attribute} +import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.sources._ + +import scala.collection.JavaConversions._ + +/** + * Allows creation of parquet based tables using the syntax + * `CREATE TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option required + * is `path`, which should be the location of a collection of, optionally partitioned, + * parquet files. + */ +class DefaultSource extends RelationProvider { + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + val path = + parameters.getOrElse("path", sys.error("'path' must be specifed for parquet tables.")) + + ParquetRelation2(path)(sqlContext) + } +} + +private[parquet] case class Partition(partitionValues: Map[String, Any], files: Seq[FileStatus]) + +/** + * An alternative to [[ParquetRelation]] that plugs in using the data sources API. This class is + * currently not intended as a full replacement of the parquet support in Spark SQL though it is + * likely that it will eventually subsume the existing physical plan implementation. + * + * Compared with the current implementation, this class has the following notable differences: + * + * Partitioning: Partitions are auto discovered and must be in the form of directories `key=value/` + * located at `path`. Currently only a single partitioning column is supported and it must + * be an integer. This class supports both fully self-describing data, which contains the partition + * key, and data where the partition key is only present in the folder structure. The presence + * of the partitioning key in the data is also auto-detected. The `null` partition is not yet + * supported. + * + * Metadata: The metadata is automatically discovered by reading the first parquet file present. + * There is currently no support for working with files that have different schema. Additionally, + * when parquet metadata caching is turned on, the FileStatus objects for all data will be cached + * to improve the speed of interactive querying. When data is added to a table it must be dropped + * and recreated to pick up any changes. + * + * Statistics: Statistics for the size of the table are automatically populated during metadata + * discovery. + */ +@DeveloperApi +case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) + extends CatalystScan with Logging { + + def sparkContext = sqlContext.sparkContext + + // Minor Hack: scala doesnt seem to respect @transient for vals declared via extraction + @transient + private var partitionKeys: Seq[String] = _ + @transient + private var partitions: Seq[Partition] = _ + discoverPartitions() + + // TODO: Only finds the first partition, assumes the key is of type Integer... + private def discoverPartitions() = { + val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration) + val partValue = "([^=]+)=([^=]+)".r + + val childrenOfPath = fs.listStatus(new Path(path)).filterNot(_.getPath.getName.startsWith("_")) + val childDirs = childrenOfPath.filter(s => s.isDir) + + if (childDirs.size > 0) { + val partitionPairs = childDirs.map(_.getPath.getName).map { + case partValue(key, value) => (key, value) + } + + val foundKeys = partitionPairs.map(_._1).distinct + if (foundKeys.size > 1) { + sys.error(s"Too many distinct partition keys: $foundKeys") + } + + // Do a parallel lookup of partition metadata. + val partitionFiles = + childDirs.par.map { d => + fs.listStatus(d.getPath) + // TODO: Is there a standard hadoop function for this? + .filterNot(_.getPath.getName.startsWith("_")) + .filterNot(_.getPath.getName.startsWith(".")) + }.seq + + partitionKeys = foundKeys.toSeq + partitions = partitionFiles.zip(partitionPairs).map { case (files, (key, value)) => + Partition(Map(key -> value.toInt), files) + }.toSeq + } else { + partitionKeys = Nil + partitions = Partition(Map.empty, childrenOfPath) :: Nil + } + } + + override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum + + val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes. + ParquetTypesConverter.readSchemaFromFile( + partitions.head.files.head.getPath, + Some(sparkContext.hadoopConfiguration), + sqlContext.isParquetBinaryAsString)) + + val dataIncludesKey = + partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true) + + override val schema = + if (dataIncludesKey) { + dataSchema + } else { + StructType(dataSchema.fields :+ StructField(partitionKeys.head, IntegerType)) + } + + override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = { + // This is mostly a hack so that we can use the existing parquet filter code. + val requiredColumns = output.map(_.name) + // TODO: Parquet filters should be based on data sources API, not catalyst expressions. + val filters = DataSourceStrategy.selectFilters(predicates) + + val job = new Job(sparkContext.hadoopConfiguration) + ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport]) + val jobConf: Configuration = ContextUtil.getConfiguration(job) + + val requestedSchema = StructType(requiredColumns.map(schema(_))) + + // TODO: Make folder based partitioning a first class citizen of the Data Sources API. + val partitionFilters = filters.collect { + case e @ EqualTo(attr, value) if partitionKeys.contains(attr) => + logInfo(s"Parquet scan partition filter: $attr=$value") + (p: Partition) => p.partitionValues(attr) == value + + case e @ In(attr, values) if partitionKeys.contains(attr) => + logInfo(s"Parquet scan partition filter: $attr IN ${values.mkString("{", ",", "}")}") + val set = values.toSet + (p: Partition) => set.contains(p.partitionValues(attr)) + + case e @ GreaterThan(attr, value) if partitionKeys.contains(attr) => + logInfo(s"Parquet scan partition filter: $attr > $value") + (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] > value.asInstanceOf[Int] + + case e @ GreaterThanOrEqual(attr, value) if partitionKeys.contains(attr) => + logInfo(s"Parquet scan partition filter: $attr >= $value") + (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] >= value.asInstanceOf[Int] + + case e @ LessThan(attr, value) if partitionKeys.contains(attr) => + logInfo(s"Parquet scan partition filter: $attr < $value") + (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] < value.asInstanceOf[Int] + + case e @ LessThanOrEqual(attr, value) if partitionKeys.contains(attr) => + logInfo(s"Parquet scan partition filter: $attr <= $value") + (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] <= value.asInstanceOf[Int] + } + + val selectedPartitions = partitions.filter(p => partitionFilters.forall(_(p))) + val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration) + val selectedFiles = selectedPartitions.flatMap(_.files).map(f => fs.makeQualified(f.getPath)) + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles:_*) + + // Push down filters when possible + predicates + .reduceOption(And) + .flatMap(ParquetFilters.createFilter) + .filter(_ => sqlContext.parquetFilterPushDown) + .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _)) + + def percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100 + logInfo(s"Reading $percentRead% of $path partitions") + + // Store both requested and original schema in `Configuration` + jobConf.set( + RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + ParquetTypesConverter.convertToString(requestedSchema.toAttributes)) + jobConf.set( + RowWriteSupport.SPARK_ROW_SCHEMA, + ParquetTypesConverter.convertToString(schema.toAttributes)) + + // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata + val useCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean + jobConf.set(SQLConf.PARQUET_CACHE_METADATA, useCache.toString) + + val baseRDD = + new org.apache.spark.rdd.NewHadoopRDD( + sparkContext, + classOf[FilteringParquetRowInputFormat], + classOf[Void], + classOf[Row], + jobConf) { + val cacheMetadata = useCache + + @transient + val cachedStatus = selectedPartitions.flatMap(_.files) + + // Overridden so we can inject our own cached files statuses. + override def getPartitions: Array[SparkPartition] = { + val inputFormat = + if (cacheMetadata) { + new FilteringParquetRowInputFormat { + override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus + } + } else { + new FilteringParquetRowInputFormat + } + + inputFormat match { + case configurable: Configurable => + configurable.setConf(getConf) + case _ => + } + val jobContext = newJobContext(getConf, jobId) + val rawSplits = inputFormat.getSplits(jobContext).toArray + val result = new Array[SparkPartition](rawSplits.size) + for (i <- 0 until rawSplits.size) { + result(i) = + new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } + result + } + } + + // The ordinal for the partition key in the result row, if requested. + val partitionKeyLocation = + partitionKeys + .headOption + .map(requiredColumns.indexOf(_)) + .getOrElse(-1) + + // When the data does not include the key and the key is requested then we must fill it in + // based on information from the input split. + if (!dataIncludesKey && partitionKeyLocation != -1) { + baseRDD.mapPartitionsWithInputSplit { case (split, iter) => + val partValue = "([^=]+)=([^=]+)".r + val partValues = + split.asInstanceOf[parquet.hadoop.ParquetInputSplit] + .getPath + .toString + .split("/") + .flatMap { + case partValue(key, value) => Some(key -> value) + case _ => None + }.toMap + + val currentValue = partValues.values.head.toInt + iter.map { pair => + val res = pair._2.asInstanceOf[SpecificMutableRow] + res.setInt(partitionKeyLocation, currentValue) + res + } + } + } else { + baseRDD.map(_._2) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index 954e86822d..37853d4d03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -31,6 +31,13 @@ import org.apache.spark.sql.execution.SparkPlan */ private[sql] object DataSourceStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) => + pruneFilterProjectRaw( + l, + projectList, + filters, + (a, f) => t.buildScan(a, f)) :: Nil + case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: PrunedFilteredScan)) => pruneFilterProject( l, @@ -51,19 +58,35 @@ private[sql] object DataSourceStrategy extends Strategy { case _ => Nil } + // Based on Public API. protected def pruneFilterProject( - relation: LogicalRelation, - projectList: Seq[NamedExpression], - filterPredicates: Seq[Expression], - scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = { + relation: LogicalRelation, + projectList: Seq[NamedExpression], + filterPredicates: Seq[Expression], + scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = { + pruneFilterProjectRaw( + relation, + projectList, + filterPredicates, + (requestedColumns, pushedFilters) => { + scanBuilder(requestedColumns.map(_.name).toArray, selectFilters(pushedFilters).toArray) + }) + } + + // Based on Catalyst expressions. + protected def pruneFilterProjectRaw( + relation: LogicalRelation, + projectList: Seq[NamedExpression], + filterPredicates: Seq[Expression], + scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[Row]) = { val projectSet = AttributeSet(projectList.flatMap(_.references)) val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) val filterCondition = filterPredicates.reduceLeftOption(And) - val pushedFilters = selectFilters(filterPredicates.map { _ transform { + val pushedFilters = filterPredicates.map { _ transform { case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. - }}).toArray + }} if (projectList.map(_.toAttribute) == projectList && projectSet.size == projectList.size && @@ -74,8 +97,6 @@ private[sql] object DataSourceStrategy extends Strategy { val requestedColumns = projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above. .map(relation.attributeMap) // Match original case of attributes. - .map(_.name) - .toArray val scan = execution.PhysicalRDD( @@ -84,14 +105,14 @@ private[sql] object DataSourceStrategy extends Strategy { filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq - val columnNames = requestedColumns.map(_.name).toArray - val scan = execution.PhysicalRDD(requestedColumns, scanBuilder(columnNames, pushedFilters)) + val scan = + execution.PhysicalRDD(requestedColumns, scanBuilder(requestedColumns, pushedFilters)) execution.Project(projectList, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } } - protected def selectFilters(filters: Seq[Expression]): Seq[Filter] = filters.collect { + protected[sql] def selectFilters(filters: Seq[Expression]): Seq[Filter] = filters.collect { case expressions.EqualTo(a: Attribute, Literal(v, _)) => EqualTo(a.name, v) case expressions.EqualTo(Literal(v, _), a: Attribute) => EqualTo(a.name, v) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 861638b1e9..2b8fc05fc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -16,12 +16,13 @@ */ package org.apache.spark.sql.sources -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{Experimental, DeveloperApi} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SQLConf, Row, SQLContext, StructType} import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute} /** + * ::DeveloperApi:: * Implemented by objects that produce relations for a specific kind of data source. When * Spark SQL is given a DDL operation with a USING clause specified, this interface is used to * pass in the parameters specified by a user. @@ -40,6 +41,7 @@ trait RelationProvider { } /** + * ::DeveloperApi:: * Represents a collection of tuples with a known schema. Classes that extend BaseRelation must * be able to produce the schema of their data in the form of a [[StructType]] Concrete * implementation should inherit from one of the descendant `Scan` classes, which define various @@ -65,6 +67,7 @@ abstract class BaseRelation { } /** + * ::DeveloperApi:: * A BaseRelation that can produce all of its tuples as an RDD of Row objects. */ @DeveloperApi @@ -73,6 +76,7 @@ abstract class TableScan extends BaseRelation { } /** + * ::DeveloperApi:: * A BaseRelation that can eliminate unneeded columns before producing an RDD * containing all of its tuples as Row objects. */ @@ -82,6 +86,7 @@ abstract class PrunedScan extends BaseRelation { } /** + * ::DeveloperApi:: * A BaseRelation that can eliminate unneeded columns and filter using selected * predicates before producing an RDD containing all matching tuples as Row objects. * @@ -93,3 +98,18 @@ abstract class PrunedScan extends BaseRelation { abstract class PrunedFilteredScan extends BaseRelation { def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] } + +/** + * ::Experimental:: + * An interface for experimenting with a more direct connection to the query planner. Compared to + * [[PrunedFilteredScan]], this operator receives the raw expressions from the + * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this + * interface is not designed to be binary compatible across releases and thus should only be used + * for experimentation. + */ +@Experimental +abstract class CatalystScan extends BaseRelation { + def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] +} + + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index cc65242c0d..7159ebd035 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -34,71 +34,52 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) /** - * Tests for our SerDe -> Native parquet scan conversion. + * A suite to test the automatic conversion of metastore tables with parquet data to use the + * built in parquet support. */ -class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { +class ParquetMetastoreSuite extends ParquetTest { override def beforeAll(): Unit = { - val partitionedTableDir = File.createTempFile("parquettests", "sparksql") - partitionedTableDir.delete() - partitionedTableDir.mkdir() - - (1 to 10).foreach { p => - val partDir = new File(partitionedTableDir, s"p=$p") - sparkContext.makeRDD(1 to 10) - .map(i => ParquetData(i, s"part-$p")) - .saveAsParquetFile(partDir.getCanonicalPath) - } - - val partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql") - partitionedTableDirWithKey.delete() - partitionedTableDirWithKey.mkdir() - - (1 to 10).foreach { p => - val partDir = new File(partitionedTableDirWithKey, s"p=$p") - sparkContext.makeRDD(1 to 10) - .map(i => ParquetDataWithKey(p, i, s"part-$p")) - .saveAsParquetFile(partDir.getCanonicalPath) - } + super.beforeAll() sql(s""" - create external table partitioned_parquet - ( - intField INT, - stringField STRING - ) - PARTITIONED BY (p int) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${partitionedTableDir.getCanonicalPath}' + create external table partitioned_parquet + ( + intField INT, + stringField STRING + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDir.getCanonicalPath}' """) sql(s""" - create external table partitioned_parquet_with_key - ( - intField INT, - stringField STRING - ) - PARTITIONED BY (p int) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${partitionedTableDirWithKey.getCanonicalPath}' + create external table partitioned_parquet_with_key + ( + intField INT, + stringField STRING + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDirWithKey.getCanonicalPath}' """) sql(s""" - create external table normal_parquet - ( - intField INT, - stringField STRING - ) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${new File(partitionedTableDir, "p=1").getCanonicalPath}' + create external table normal_parquet + ( + intField INT, + stringField STRING + ) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${new File(partitionedTableDir, "p=1").getCanonicalPath}' """) (1 to 10).foreach { p => @@ -116,6 +97,82 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { setConf("spark.sql.hive.convertMetastoreParquet", "false") } + test("conversion is working") { + assert( + sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + case _: HiveTableScan => true + }.isEmpty) + assert( + sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + case _: ParquetTableScan => true + }.nonEmpty) + } +} + +/** + * A suite of tests for the Parquet support through the data sources API. + */ +class ParquetSourceSuite extends ParquetTest { + override def beforeAll(): Unit = { + super.beforeAll() + + sql( s""" + create temporary table partitioned_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDir.getCanonicalPath}' + ) + """) + + sql( s""" + create temporary table partitioned_parquet_with_key + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithKey.getCanonicalPath}' + ) + """) + + sql( s""" + create temporary table normal_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${new File(partitionedTableDir, "p=1").getCanonicalPath}' + ) + """) + } +} + +/** + * A collection of tests for parquet data with various forms of partitioning. + */ +abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { + var partitionedTableDir: File = null + var partitionedTableDirWithKey: File = null + + override def beforeAll(): Unit = { + partitionedTableDir = File.createTempFile("parquettests", "sparksql") + partitionedTableDir.delete() + partitionedTableDir.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDir, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-$p")) + .saveAsParquetFile(partDir.getCanonicalPath) + } + + partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithKey.delete() + partitionedTableDirWithKey.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKey, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithKey(p, i, s"part-$p")) + .saveAsParquetFile(partDir.getCanonicalPath) + } + } + Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table => test(s"project the partitioning column $table") { checkAnswer( @@ -193,15 +250,4 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { sql("SELECT COUNT(*) FROM normal_parquet"), 10) } - - test("conversion is working") { - assert( - sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { - case _: HiveTableScan => true - }.isEmpty) - assert( - sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { - case _: ParquetTableScan => true - }.nonEmpty) - } } |