aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-11-20 18:31:02 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-20 18:31:02 -0800
commit02ec058efe24348cdd3691b55942e6f0ef138732 (patch)
treeaeb665a9e313f6dfe9de73329987c26762537c8f /sql
parent84d79ee9ec47465269f7b0a7971176da93c96f3f (diff)
downloadspark-02ec058efe24348cdd3691b55942e6f0ef138732.tar.gz
spark-02ec058efe24348cdd3691b55942e6f0ef138732.tar.bz2
spark-02ec058efe24348cdd3691b55942e6f0ef138732.zip
[SPARK-4413][SQL] Parquet support through datasource API
Goals: - Support for accessing parquet using SQL but not requiring Hive (thus allowing support of parquet tables with decimal columns) - Support for folder based partitioning with automatic discovery of available partitions - Caching of file metadata See scaladoc of `ParquetRelation2` for more details. Author: Michael Armbrust <michael@databricks.com> Closes #3269 from marmbrus/newParquet and squashes the following commits: 1dd75f1 [Michael Armbrust] Pass all paths for FileInputFormat at once. 645768b [Michael Armbrust] Review comments. abd8e2f [Michael Armbrust] Alternative implementation of parquet based on the datasources API. 938019e [Michael Armbrust] Add an experimental interface to data sources that exposes catalyst expressions. e9d2641 [Michael Armbrust] logging / formatting improvements.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala290
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala43
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala22
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala)178
5 files changed, 458 insertions, 79 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 5d0643a64a..0e36852ddd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -361,7 +361,7 @@ private[parquet] class FilteringParquetRowInputFormat
private var footers: JList[Footer] = _
- private var fileStatuses= Map.empty[Path, FileStatus]
+ private var fileStatuses = Map.empty[Path, FileStatus]
override def createRecordReader(
inputSplit: InputSplit,
@@ -405,7 +405,9 @@ private[parquet] class FilteringParquetRowInputFormat
}
val newFooters = new mutable.HashMap[FileStatus, Footer]
if (toFetch.size > 0) {
+ val startFetch = System.currentTimeMillis
val fetched = getFooters(conf, toFetch)
+ logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch} ms")
for ((status, i) <- toFetch.zipWithIndex) {
newFooters(status) = fetched.get(i)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
new file mode 100644
index 0000000000..bea12e6dd6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parquet
+
+import java.util.{List => JList}
+
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
+
+import parquet.hadoop.ParquetInputFormat
+import parquet.hadoop.util.ContextUtil
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.{Partition => SparkPartition, Logging}
+import org.apache.spark.rdd.{NewHadoopPartition, RDD}
+
+import org.apache.spark.sql.{SQLConf, Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, And, Expression, Attribute}
+import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.sources._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Allows creation of parquet based tables using the syntax
+ * `CREATE TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option required
+ * is `path`, which should be the location of a collection of, optionally partitioned,
+ * parquet files.
+ */
+class DefaultSource extends RelationProvider {
+ /** Returns a new base relation with the given parameters. */
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String]): BaseRelation = {
+ val path =
+ parameters.getOrElse("path", sys.error("'path' must be specifed for parquet tables."))
+
+ ParquetRelation2(path)(sqlContext)
+ }
+}
+
+private[parquet] case class Partition(partitionValues: Map[String, Any], files: Seq[FileStatus])
+
+/**
+ * An alternative to [[ParquetRelation]] that plugs in using the data sources API. This class is
+ * currently not intended as a full replacement of the parquet support in Spark SQL though it is
+ * likely that it will eventually subsume the existing physical plan implementation.
+ *
+ * Compared with the current implementation, this class has the following notable differences:
+ *
+ * Partitioning: Partitions are auto discovered and must be in the form of directories `key=value/`
+ * located at `path`. Currently only a single partitioning column is supported and it must
+ * be an integer. This class supports both fully self-describing data, which contains the partition
+ * key, and data where the partition key is only present in the folder structure. The presence
+ * of the partitioning key in the data is also auto-detected. The `null` partition is not yet
+ * supported.
+ *
+ * Metadata: The metadata is automatically discovered by reading the first parquet file present.
+ * There is currently no support for working with files that have different schema. Additionally,
+ * when parquet metadata caching is turned on, the FileStatus objects for all data will be cached
+ * to improve the speed of interactive querying. When data is added to a table it must be dropped
+ * and recreated to pick up any changes.
+ *
+ * Statistics: Statistics for the size of the table are automatically populated during metadata
+ * discovery.
+ */
+@DeveloperApi
+case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
+ extends CatalystScan with Logging {
+
+ def sparkContext = sqlContext.sparkContext
+
+ // Minor Hack: scala doesnt seem to respect @transient for vals declared via extraction
+ @transient
+ private var partitionKeys: Seq[String] = _
+ @transient
+ private var partitions: Seq[Partition] = _
+ discoverPartitions()
+
+ // TODO: Only finds the first partition, assumes the key is of type Integer...
+ private def discoverPartitions() = {
+ val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration)
+ val partValue = "([^=]+)=([^=]+)".r
+
+ val childrenOfPath = fs.listStatus(new Path(path)).filterNot(_.getPath.getName.startsWith("_"))
+ val childDirs = childrenOfPath.filter(s => s.isDir)
+
+ if (childDirs.size > 0) {
+ val partitionPairs = childDirs.map(_.getPath.getName).map {
+ case partValue(key, value) => (key, value)
+ }
+
+ val foundKeys = partitionPairs.map(_._1).distinct
+ if (foundKeys.size > 1) {
+ sys.error(s"Too many distinct partition keys: $foundKeys")
+ }
+
+ // Do a parallel lookup of partition metadata.
+ val partitionFiles =
+ childDirs.par.map { d =>
+ fs.listStatus(d.getPath)
+ // TODO: Is there a standard hadoop function for this?
+ .filterNot(_.getPath.getName.startsWith("_"))
+ .filterNot(_.getPath.getName.startsWith("."))
+ }.seq
+
+ partitionKeys = foundKeys.toSeq
+ partitions = partitionFiles.zip(partitionPairs).map { case (files, (key, value)) =>
+ Partition(Map(key -> value.toInt), files)
+ }.toSeq
+ } else {
+ partitionKeys = Nil
+ partitions = Partition(Map.empty, childrenOfPath) :: Nil
+ }
+ }
+
+ override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum
+
+ val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes.
+ ParquetTypesConverter.readSchemaFromFile(
+ partitions.head.files.head.getPath,
+ Some(sparkContext.hadoopConfiguration),
+ sqlContext.isParquetBinaryAsString))
+
+ val dataIncludesKey =
+ partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
+
+ override val schema =
+ if (dataIncludesKey) {
+ dataSchema
+ } else {
+ StructType(dataSchema.fields :+ StructField(partitionKeys.head, IntegerType))
+ }
+
+ override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = {
+ // This is mostly a hack so that we can use the existing parquet filter code.
+ val requiredColumns = output.map(_.name)
+ // TODO: Parquet filters should be based on data sources API, not catalyst expressions.
+ val filters = DataSourceStrategy.selectFilters(predicates)
+
+ val job = new Job(sparkContext.hadoopConfiguration)
+ ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+ val jobConf: Configuration = ContextUtil.getConfiguration(job)
+
+ val requestedSchema = StructType(requiredColumns.map(schema(_)))
+
+ // TODO: Make folder based partitioning a first class citizen of the Data Sources API.
+ val partitionFilters = filters.collect {
+ case e @ EqualTo(attr, value) if partitionKeys.contains(attr) =>
+ logInfo(s"Parquet scan partition filter: $attr=$value")
+ (p: Partition) => p.partitionValues(attr) == value
+
+ case e @ In(attr, values) if partitionKeys.contains(attr) =>
+ logInfo(s"Parquet scan partition filter: $attr IN ${values.mkString("{", ",", "}")}")
+ val set = values.toSet
+ (p: Partition) => set.contains(p.partitionValues(attr))
+
+ case e @ GreaterThan(attr, value) if partitionKeys.contains(attr) =>
+ logInfo(s"Parquet scan partition filter: $attr > $value")
+ (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] > value.asInstanceOf[Int]
+
+ case e @ GreaterThanOrEqual(attr, value) if partitionKeys.contains(attr) =>
+ logInfo(s"Parquet scan partition filter: $attr >= $value")
+ (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] >= value.asInstanceOf[Int]
+
+ case e @ LessThan(attr, value) if partitionKeys.contains(attr) =>
+ logInfo(s"Parquet scan partition filter: $attr < $value")
+ (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] < value.asInstanceOf[Int]
+
+ case e @ LessThanOrEqual(attr, value) if partitionKeys.contains(attr) =>
+ logInfo(s"Parquet scan partition filter: $attr <= $value")
+ (p: Partition) => p.partitionValues(attr).asInstanceOf[Int] <= value.asInstanceOf[Int]
+ }
+
+ val selectedPartitions = partitions.filter(p => partitionFilters.forall(_(p)))
+ val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration)
+ val selectedFiles = selectedPartitions.flatMap(_.files).map(f => fs.makeQualified(f.getPath))
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles:_*)
+
+ // Push down filters when possible
+ predicates
+ .reduceOption(And)
+ .flatMap(ParquetFilters.createFilter)
+ .filter(_ => sqlContext.parquetFilterPushDown)
+ .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
+
+ def percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100
+ logInfo(s"Reading $percentRead% of $path partitions")
+
+ // Store both requested and original schema in `Configuration`
+ jobConf.set(
+ RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ ParquetTypesConverter.convertToString(requestedSchema.toAttributes))
+ jobConf.set(
+ RowWriteSupport.SPARK_ROW_SCHEMA,
+ ParquetTypesConverter.convertToString(schema.toAttributes))
+
+ // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+ val useCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
+ jobConf.set(SQLConf.PARQUET_CACHE_METADATA, useCache.toString)
+
+ val baseRDD =
+ new org.apache.spark.rdd.NewHadoopRDD(
+ sparkContext,
+ classOf[FilteringParquetRowInputFormat],
+ classOf[Void],
+ classOf[Row],
+ jobConf) {
+ val cacheMetadata = useCache
+
+ @transient
+ val cachedStatus = selectedPartitions.flatMap(_.files)
+
+ // Overridden so we can inject our own cached files statuses.
+ override def getPartitions: Array[SparkPartition] = {
+ val inputFormat =
+ if (cacheMetadata) {
+ new FilteringParquetRowInputFormat {
+ override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
+ }
+ } else {
+ new FilteringParquetRowInputFormat
+ }
+
+ inputFormat match {
+ case configurable: Configurable =>
+ configurable.setConf(getConf)
+ case _ =>
+ }
+ val jobContext = newJobContext(getConf, jobId)
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val result = new Array[SparkPartition](rawSplits.size)
+ for (i <- 0 until rawSplits.size) {
+ result(i) =
+ new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
+ result
+ }
+ }
+
+ // The ordinal for the partition key in the result row, if requested.
+ val partitionKeyLocation =
+ partitionKeys
+ .headOption
+ .map(requiredColumns.indexOf(_))
+ .getOrElse(-1)
+
+ // When the data does not include the key and the key is requested then we must fill it in
+ // based on information from the input split.
+ if (!dataIncludesKey && partitionKeyLocation != -1) {
+ baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
+ val partValue = "([^=]+)=([^=]+)".r
+ val partValues =
+ split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
+ .getPath
+ .toString
+ .split("/")
+ .flatMap {
+ case partValue(key, value) => Some(key -> value)
+ case _ => None
+ }.toMap
+
+ val currentValue = partValues.values.head.toInt
+ iter.map { pair =>
+ val res = pair._2.asInstanceOf[SpecificMutableRow]
+ res.setInt(partitionKeyLocation, currentValue)
+ res
+ }
+ }
+ } else {
+ baseRDD.map(_._2)
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 954e86822d..37853d4d03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -31,6 +31,13 @@ import org.apache.spark.sql.execution.SparkPlan
*/
private[sql] object DataSourceStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) =>
+ pruneFilterProjectRaw(
+ l,
+ projectList,
+ filters,
+ (a, f) => t.buildScan(a, f)) :: Nil
+
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: PrunedFilteredScan)) =>
pruneFilterProject(
l,
@@ -51,19 +58,35 @@ private[sql] object DataSourceStrategy extends Strategy {
case _ => Nil
}
+ // Based on Public API.
protected def pruneFilterProject(
- relation: LogicalRelation,
- projectList: Seq[NamedExpression],
- filterPredicates: Seq[Expression],
- scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = {
+ relation: LogicalRelation,
+ projectList: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = {
+ pruneFilterProjectRaw(
+ relation,
+ projectList,
+ filterPredicates,
+ (requestedColumns, pushedFilters) => {
+ scanBuilder(requestedColumns.map(_.name).toArray, selectFilters(pushedFilters).toArray)
+ })
+ }
+
+ // Based on Catalyst expressions.
+ protected def pruneFilterProjectRaw(
+ relation: LogicalRelation,
+ projectList: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[Row]) = {
val projectSet = AttributeSet(projectList.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition = filterPredicates.reduceLeftOption(And)
- val pushedFilters = selectFilters(filterPredicates.map { _ transform {
+ val pushedFilters = filterPredicates.map { _ transform {
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
- }}).toArray
+ }}
if (projectList.map(_.toAttribute) == projectList &&
projectSet.size == projectList.size &&
@@ -74,8 +97,6 @@ private[sql] object DataSourceStrategy extends Strategy {
val requestedColumns =
projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above.
.map(relation.attributeMap) // Match original case of attributes.
- .map(_.name)
- .toArray
val scan =
execution.PhysicalRDD(
@@ -84,14 +105,14 @@ private[sql] object DataSourceStrategy extends Strategy {
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
- val columnNames = requestedColumns.map(_.name).toArray
- val scan = execution.PhysicalRDD(requestedColumns, scanBuilder(columnNames, pushedFilters))
+ val scan =
+ execution.PhysicalRDD(requestedColumns, scanBuilder(requestedColumns, pushedFilters))
execution.Project(projectList, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
}
- protected def selectFilters(filters: Seq[Expression]): Seq[Filter] = filters.collect {
+ protected[sql] def selectFilters(filters: Seq[Expression]): Seq[Filter] = filters.collect {
case expressions.EqualTo(a: Attribute, Literal(v, _)) => EqualTo(a.name, v)
case expressions.EqualTo(Literal(v, _), a: Attribute) => EqualTo(a.name, v)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 861638b1e9..2b8fc05fc0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -16,12 +16,13 @@
*/
package org.apache.spark.sql.sources
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLConf, Row, SQLContext, StructType}
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
/**
+ * ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with a USING clause specified, this interface is used to
* pass in the parameters specified by a user.
@@ -40,6 +41,7 @@ trait RelationProvider {
}
/**
+ * ::DeveloperApi::
* Represents a collection of tuples with a known schema. Classes that extend BaseRelation must
* be able to produce the schema of their data in the form of a [[StructType]] Concrete
* implementation should inherit from one of the descendant `Scan` classes, which define various
@@ -65,6 +67,7 @@ abstract class BaseRelation {
}
/**
+ * ::DeveloperApi::
* A BaseRelation that can produce all of its tuples as an RDD of Row objects.
*/
@DeveloperApi
@@ -73,6 +76,7 @@ abstract class TableScan extends BaseRelation {
}
/**
+ * ::DeveloperApi::
* A BaseRelation that can eliminate unneeded columns before producing an RDD
* containing all of its tuples as Row objects.
*/
@@ -82,6 +86,7 @@ abstract class PrunedScan extends BaseRelation {
}
/**
+ * ::DeveloperApi::
* A BaseRelation that can eliminate unneeded columns and filter using selected
* predicates before producing an RDD containing all matching tuples as Row objects.
*
@@ -93,3 +98,18 @@ abstract class PrunedScan extends BaseRelation {
abstract class PrunedFilteredScan extends BaseRelation {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
+
+/**
+ * ::Experimental::
+ * An interface for experimenting with a more direct connection to the query planner. Compared to
+ * [[PrunedFilteredScan]], this operator receives the raw expressions from the
+ * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. Unlike the other APIs this
+ * interface is not designed to be binary compatible across releases and thus should only be used
+ * for experimentation.
+ */
+@Experimental
+abstract class CatalystScan extends BaseRelation {
+ def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
+}
+
+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index cc65242c0d..7159ebd035 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -34,71 +34,52 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
/**
- * Tests for our SerDe -> Native parquet scan conversion.
+ * A suite to test the automatic conversion of metastore tables with parquet data to use the
+ * built in parquet support.
*/
-class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
+class ParquetMetastoreSuite extends ParquetTest {
override def beforeAll(): Unit = {
- val partitionedTableDir = File.createTempFile("parquettests", "sparksql")
- partitionedTableDir.delete()
- partitionedTableDir.mkdir()
-
- (1 to 10).foreach { p =>
- val partDir = new File(partitionedTableDir, s"p=$p")
- sparkContext.makeRDD(1 to 10)
- .map(i => ParquetData(i, s"part-$p"))
- .saveAsParquetFile(partDir.getCanonicalPath)
- }
-
- val partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
- partitionedTableDirWithKey.delete()
- partitionedTableDirWithKey.mkdir()
-
- (1 to 10).foreach { p =>
- val partDir = new File(partitionedTableDirWithKey, s"p=$p")
- sparkContext.makeRDD(1 to 10)
- .map(i => ParquetDataWithKey(p, i, s"part-$p"))
- .saveAsParquetFile(partDir.getCanonicalPath)
- }
+ super.beforeAll()
sql(s"""
- create external table partitioned_parquet
- (
- intField INT,
- stringField STRING
- )
- PARTITIONED BY (p int)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${partitionedTableDir.getCanonicalPath}'
+ create external table partitioned_parquet
+ (
+ intField INT,
+ stringField STRING
+ )
+ PARTITIONED BY (p int)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${partitionedTableDir.getCanonicalPath}'
""")
sql(s"""
- create external table partitioned_parquet_with_key
- (
- intField INT,
- stringField STRING
- )
- PARTITIONED BY (p int)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${partitionedTableDirWithKey.getCanonicalPath}'
+ create external table partitioned_parquet_with_key
+ (
+ intField INT,
+ stringField STRING
+ )
+ PARTITIONED BY (p int)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${partitionedTableDirWithKey.getCanonicalPath}'
""")
sql(s"""
- create external table normal_parquet
- (
- intField INT,
- stringField STRING
- )
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+ create external table normal_parquet
+ (
+ intField INT,
+ stringField STRING
+ )
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ STORED AS
+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
""")
(1 to 10).foreach { p =>
@@ -116,6 +97,82 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}
+ test("conversion is working") {
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ case _: HiveTableScan => true
+ }.isEmpty)
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+ case _: ParquetTableScan => true
+ }.nonEmpty)
+ }
+}
+
+/**
+ * A suite of tests for the Parquet support through the data sources API.
+ */
+class ParquetSourceSuite extends ParquetTest {
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ sql( s"""
+ create temporary table partitioned_parquet
+ USING org.apache.spark.sql.parquet
+ OPTIONS (
+ path '${partitionedTableDir.getCanonicalPath}'
+ )
+ """)
+
+ sql( s"""
+ create temporary table partitioned_parquet_with_key
+ USING org.apache.spark.sql.parquet
+ OPTIONS (
+ path '${partitionedTableDirWithKey.getCanonicalPath}'
+ )
+ """)
+
+ sql( s"""
+ create temporary table normal_parquet
+ USING org.apache.spark.sql.parquet
+ OPTIONS (
+ path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+ )
+ """)
+ }
+}
+
+/**
+ * A collection of tests for parquet data with various forms of partitioning.
+ */
+abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
+ var partitionedTableDir: File = null
+ var partitionedTableDirWithKey: File = null
+
+ override def beforeAll(): Unit = {
+ partitionedTableDir = File.createTempFile("parquettests", "sparksql")
+ partitionedTableDir.delete()
+ partitionedTableDir.mkdir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDir, s"p=$p")
+ sparkContext.makeRDD(1 to 10)
+ .map(i => ParquetData(i, s"part-$p"))
+ .saveAsParquetFile(partDir.getCanonicalPath)
+ }
+
+ partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
+ partitionedTableDirWithKey.delete()
+ partitionedTableDirWithKey.mkdir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDirWithKey, s"p=$p")
+ sparkContext.makeRDD(1 to 10)
+ .map(i => ParquetDataWithKey(p, i, s"part-$p"))
+ .saveAsParquetFile(partDir.getCanonicalPath)
+ }
+ }
+
Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
test(s"project the partitioning column $table") {
checkAnswer(
@@ -193,15 +250,4 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
sql("SELECT COUNT(*) FROM normal_parquet"),
10)
}
-
- test("conversion is working") {
- assert(
- sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
- case _: HiveTableScan => true
- }.isEmpty)
- assert(
- sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
- case _: ParquetTableScan => true
- }.nonEmpty)
- }
}