aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-02-16 01:38:31 -0800
committerCheng Lian <lian@databricks.com>2015-02-16 01:38:31 -0800
commit3ce58cf9c0ffe8b867ca79b404fe3fa291cf0e56 (patch)
treea583c820c1cecd46fb021323d88ac3e50af01b98 /sql/core
parent199a9e80275ac70582ea32f0f2f5a0a15b168785 (diff)
downloadspark-3ce58cf9c0ffe8b867ca79b404fe3fa291cf0e56.tar.gz
spark-3ce58cf9c0ffe8b867ca79b404fe3fa291cf0e56.tar.bz2
spark-3ce58cf9c0ffe8b867ca79b404fe3fa291cf0e56.zip
[SPARK-4553] [SPARK-5767] [SQL] Wires Parquet data source with the newly introduced write support for data source API
This PR migrates the Parquet data source to the new data source write support API. Now users can also overwriting and appending to existing tables. Notice that inserting into partitioned tables is not supported yet. When Parquet data source is enabled, insertion to Hive Metastore Parquet tables is also fullfilled by the Parquet data source. This is done by the newly introduced `HiveMetastoreCatalog.ParquetConversions` rule, which is a "proper" implementation of the original hacky `HiveStrategies.ParquetConversion`. The latter is still preserved, and can be removed together with the old Parquet support in the future. TODO: - [x] Update outdated comments in `newParquet.scala`. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4563) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #4563 from liancheng/parquet-refining and squashes the following commits: fa98d27 [Cheng Lian] Fixes test cases which should disable off Parquet data source 2476e82 [Cheng Lian] Fixes compilation error introduced during rebasing a83d290 [Cheng Lian] Passes Hive Metastore partitioning information to ParquetRelation2
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala265
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala343
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala417
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala202
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala147
8 files changed, 891 insertions, 525 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index d20b42de22..b42a52ebd2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -446,7 +446,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
baseRelationToDataFrame(parquet.ParquetRelation2(path +: paths, Map.empty)(this))
} else {
DataFrame(this, parquet.ParquetRelation(
- paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
+ (path +: paths).mkString(","), Some(sparkContext.hadoopConfiguration), this))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 28cd17fde4..7dd8bea49b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -647,6 +647,6 @@ private[parquet] object FileSystemHelper {
sys.error("ERROR: attempting to append to set of Parquet files and found file" +
s"that does not match name pattern: $other")
case _ => 0
- }.reduceLeft((a, b) => if (a < b) b else a)
+ }.reduceOption(_ max _).getOrElse(0)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index 538d774eb9..d0856df8d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -23,8 +23,8 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try
-import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.util
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.util.Utils
/**
@@ -37,7 +37,8 @@ import org.apache.spark.util.Utils
trait ParquetTest {
val sqlContext: SQLContext
- import sqlContext._
+ import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
+ import sqlContext.{conf, sparkContext}
protected def configuration = sparkContext.hadoopConfiguration
@@ -49,11 +50,11 @@ trait ParquetTest {
*/
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
- val currentValues = keys.map(key => Try(getConf(key)).toOption)
- (keys, values).zipped.foreach(setConf)
+ val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
+ (keys, values).zipped.foreach(conf.setConf)
try f finally {
keys.zip(currentValues).foreach {
- case (key, Some(value)) => setConf(key, value)
+ case (key, Some(value)) => conf.setConf(key, value)
case (key, None) => conf.unsetConf(key)
}
}
@@ -88,7 +89,6 @@ trait ParquetTest {
protected def withParquetFile[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: String => Unit): Unit = {
- import sqlContext.implicits._
withTempPath { file =>
sparkContext.parallelize(data).toDF().saveAsParquetFile(file.getCanonicalPath)
f(file.getCanonicalPath)
@@ -102,14 +102,14 @@ trait ParquetTest {
protected def withParquetRDD[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: DataFrame => Unit): Unit = {
- withParquetFile(data)(path => f(parquetFile(path)))
+ withParquetFile(data)(path => f(sqlContext.parquetFile(path)))
}
/**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
- try f finally dropTempTable(tableName)
+ try f finally sqlContext.dropTempTable(tableName)
}
/**
@@ -125,4 +125,26 @@ trait ParquetTest {
withTempTable(tableName)(f)
}
}
+
+ protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
+ data: Seq[T], path: File): Unit = {
+ data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
+ }
+
+ protected def makePartitionDir(
+ basePath: File,
+ defaultPartitionName: String,
+ partitionCols: (String, Any)*): File = {
+ val partNames = partitionCols.map { case (k, v) =>
+ val valueString = if (v == null || v == "") defaultPartitionName else v.toString
+ s"$k=$valueString"
+ }
+
+ val partDir = partNames.foldLeft(basePath) { (parent, child) =>
+ new File(parent, child)
+ }
+
+ assert(partDir.mkdirs(), s"Couldn't create directory $partDir")
+ partDir
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 3a9f060061..9279f5a903 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -20,7 +20,7 @@ import java.io.IOException
import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.text.SimpleDateFormat
-import java.util.{List => JList, Date}
+import java.util.{Date, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
@@ -34,8 +34,9 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
import parquet.filter2.predicate.FilterApi
import parquet.format.converter.ParquetMetadataConverter
-import parquet.hadoop.{ParquetInputFormat, _}
+import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
+import parquet.hadoop.{ParquetInputFormat, _}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
@@ -44,21 +45,36 @@ import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.parquet.ParquetTypesConverter._
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLConf, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType, _}
-import org.apache.spark.{Partition => SparkPartition, TaskContext, SerializableWritable, Logging, SparkException}
-
+import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
+import org.apache.spark.{Logging, Partition => SparkPartition, SerializableWritable, SparkException, TaskContext}
/**
- * Allows creation of parquet based tables using the syntax
- * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
- * required is `path`, which should be the location of a collection of, optionally partitioned,
- * parquet files.
+ * Allows creation of Parquet based tables using the syntax:
+ * {{{
+ * CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet OPTIONS (...)
+ * }}}
+ *
+ * Supported options include:
+ *
+ * - `path`: Required. When reading Parquet files, `path` should point to the location of the
+ * Parquet file(s). It can be either a single raw Parquet file, or a directory of Parquet files.
+ * In the latter case, this data source tries to discover partitioning information if the the
+ * directory is structured in the same style of Hive partitioned tables. When writing Parquet
+ * file, `path` should point to the destination folder.
+ *
+ * - `mergeSchema`: Optional. Indicates whether we should merge potentially different (but
+ * compatible) schemas stored in all Parquet part-files.
+ *
+ * - `partition.defaultName`: Optional. Partition name used when a value of a partition column is
+ * null or empty string. This is similar to the `hive.exec.default.partition.name` configuration
+ * in Hive.
*/
class DefaultSource
extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider {
+
private def checkPath(parameters: Map[String, String]): String = {
parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))
}
@@ -70,6 +86,7 @@ class DefaultSource
ParquetRelation2(Seq(checkPath(parameters)), parameters, None)(sqlContext)
}
+ /** Returns a new base relation with the given parameters and schema. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
@@ -77,6 +94,7 @@ class DefaultSource
ParquetRelation2(Seq(checkPath(parameters)), parameters, Some(schema))(sqlContext)
}
+ /** Returns a new base relation with the given parameters and save given data into it. */
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
@@ -85,33 +103,19 @@ class DefaultSource
val path = checkPath(parameters)
val filesystemPath = new Path(path)
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val doSave = if (fs.exists(filesystemPath)) {
- mode match {
- case SaveMode.Append =>
- sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
- case SaveMode.Overwrite =>
- fs.delete(filesystemPath, true)
- true
- case SaveMode.ErrorIfExists =>
- sys.error(s"path $path already exists.")
- case SaveMode.Ignore => false
- }
- } else {
- true
+ val doInsertion = (mode, fs.exists(filesystemPath)) match {
+ case (SaveMode.ErrorIfExists, true) =>
+ sys.error(s"path $path already exists.")
+ case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
+ true
+ case (SaveMode.Ignore, exists) =>
+ !exists
}
- val relation = if (doSave) {
- // Only save data when the save mode is not ignore.
- ParquetRelation.createEmpty(
- path,
- data.schema.toAttributes,
- false,
- sqlContext.sparkContext.hadoopConfiguration,
- sqlContext)
-
- val createdRelation = createRelation(sqlContext, parameters, data.schema)
- createdRelation.asInstanceOf[ParquetRelation2].insert(data, true)
-
+ val relation = if (doInsertion) {
+ val createdRelation =
+ createRelation(sqlContext, parameters, data.schema).asInstanceOf[ParquetRelation2]
+ createdRelation.insert(data, overwrite = mode == SaveMode.Overwrite)
createdRelation
} else {
// If the save mode is Ignore, we will just create the relation based on existing data.
@@ -122,37 +126,31 @@ class DefaultSource
}
}
-private[parquet] case class Partition(values: Row, path: String)
+private[sql] case class Partition(values: Row, path: String)
-private[parquet] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
+private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
/**
* An alternative to [[ParquetRelation]] that plugs in using the data sources API. This class is
- * currently not intended as a full replacement of the parquet support in Spark SQL though it is
- * likely that it will eventually subsume the existing physical plan implementation.
- *
- * Compared with the current implementation, this class has the following notable differences:
- *
- * Partitioning: Partitions are auto discovered and must be in the form of directories `key=value/`
- * located at `path`. Currently only a single partitioning column is supported and it must
- * be an integer. This class supports both fully self-describing data, which contains the partition
- * key, and data where the partition key is only present in the folder structure. The presence
- * of the partitioning key in the data is also auto-detected. The `null` partition is not yet
- * supported.
+ * intended as a full replacement of the Parquet support in Spark SQL. The old implementation will
+ * be deprecated and eventually removed once this version is proved to be stable enough.
*
- * Metadata: The metadata is automatically discovered by reading the first parquet file present.
- * There is currently no support for working with files that have different schema. Additionally,
- * when parquet metadata caching is turned on, the FileStatus objects for all data will be cached
- * to improve the speed of interactive querying. When data is added to a table it must be dropped
- * and recreated to pick up any changes.
+ * Compared with the old implementation, this class has the following notable differences:
*
- * Statistics: Statistics for the size of the table are automatically populated during metadata
- * discovery.
+ * - Partitioning discovery: Hive style multi-level partitions are auto discovered.
+ * - Metadata discovery: Parquet is a format comes with schema evolving support. This data source
+ * can detect and merge schemas from all Parquet part-files as long as they are compatible.
+ * Also, metadata and [[FileStatus]]es are cached for better performance.
+ * - Statistics: Statistics for the size of the table are automatically populated during schema
+ * discovery.
*/
@DeveloperApi
-case class ParquetRelation2
- (paths: Seq[String], parameters: Map[String, String], maybeSchema: Option[StructType] = None)
- (@transient val sqlContext: SQLContext)
+case class ParquetRelation2(
+ paths: Seq[String],
+ parameters: Map[String, String],
+ maybeSchema: Option[StructType] = None,
+ maybePartitionSpec: Option[PartitionSpec] = None)(
+ @transient val sqlContext: SQLContext)
extends CatalystScan
with InsertableRelation
with SparkHadoopMapReduceUtil
@@ -175,43 +173,90 @@ case class ParquetRelation2
override def equals(other: Any) = other match {
case relation: ParquetRelation2 =>
+ // If schema merging is required, we don't compare the actual schemas since they may evolve.
+ val schemaEquality = if (shouldMergeSchemas) {
+ shouldMergeSchemas == relation.shouldMergeSchemas
+ } else {
+ schema == relation.schema
+ }
+
paths.toSet == relation.paths.toSet &&
+ schemaEquality &&
maybeMetastoreSchema == relation.maybeMetastoreSchema &&
- (shouldMergeSchemas == relation.shouldMergeSchemas || schema == relation.schema)
+ maybePartitionSpec == relation.maybePartitionSpec
+
case _ => false
}
private[sql] def sparkContext = sqlContext.sparkContext
- @transient private val fs = FileSystem.get(sparkContext.hadoopConfiguration)
-
private class MetadataCache {
+ // `FileStatus` objects of all "_metadata" files.
private var metadataStatuses: Array[FileStatus] = _
+
+ // `FileStatus` objects of all "_common_metadata" files.
private var commonMetadataStatuses: Array[FileStatus] = _
+
+ // Parquet footer cache.
private var footers: Map[FileStatus, Footer] = _
- private var parquetSchema: StructType = _
+ // `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
+
+ // Partition spec of this table, including names, data types, and values of each partition
+ // column, and paths of each partition.
var partitionSpec: PartitionSpec = _
+
+ // Schema of the actual Parquet files, without partition columns discovered from partition
+ // directory paths.
+ var parquetSchema: StructType = _
+
+ // Schema of the whole table, including partition columns.
var schema: StructType = _
- var dataSchemaIncludesPartitionKeys: Boolean = _
+ // Indicates whether partition columns are also included in Parquet data file schema. If not,
+ // we need to fill in partition column values into read rows when scanning the table.
+ var partitionKeysIncludedInParquetSchema: Boolean = _
+
+ def prepareMetadata(path: Path, schema: StructType, conf: Configuration): Unit = {
+ conf.set(
+ ParquetOutputFormat.COMPRESSION,
+ ParquetRelation
+ .shortParquetCompressionCodecNames
+ .getOrElse(
+ sqlContext.conf.parquetCompressionCodec.toUpperCase,
+ CompressionCodecName.UNCOMPRESSED).name())
+
+ ParquetRelation.enableLogForwarding()
+ ParquetTypesConverter.writeMetaData(schema.toAttributes, path, conf)
+ }
+
+ /**
+ * Refreshes `FileStatus`es, footers, partition spec, and table schema.
+ */
def refresh(): Unit = {
- val baseStatuses = {
- val statuses = paths.distinct.map(p => fs.getFileStatus(fs.makeQualified(new Path(p))))
- // Support either reading a collection of raw Parquet part-files, or a collection of folders
- // containing Parquet files (e.g. partitioned Parquet table).
- assert(statuses.forall(!_.isDir) || statuses.forall(_.isDir))
- statuses.toArray
- }
+ val fs = FileSystem.get(sparkContext.hadoopConfiguration)
+
+ // Support either reading a collection of raw Parquet part-files, or a collection of folders
+ // containing Parquet files (e.g. partitioned Parquet table).
+ val baseStatuses = paths.distinct.map { p =>
+ val qualified = fs.makeQualified(new Path(p))
+
+ if (!fs.exists(qualified) && maybeSchema.isDefined) {
+ fs.mkdirs(qualified)
+ prepareMetadata(qualified, maybeSchema.get, sparkContext.hadoopConfiguration)
+ }
+
+ fs.getFileStatus(qualified)
+ }.toArray
+ assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
+ // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = baseStatuses.flatMap { f =>
- val statuses = SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
+ SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}
- assert(statuses.nonEmpty, s"${f.getPath} is an empty folder.")
- statuses
}
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
@@ -225,13 +270,14 @@ case class ParquetRelation2
f -> new Footer(f.getPath, parquetMetadata)
}.seq.toMap
- partitionSpec = {
- val partitionDirs = dataStatuses
+ partitionSpec = maybePartitionSpec.getOrElse {
+ val partitionDirs = leaves
.filterNot(baseStatuses.contains)
.map(_.getPath.getParent)
.distinct
if (partitionDirs.nonEmpty) {
+ // Parses names and values of partition columns, and infer their data types.
ParquetRelation2.parsePartitions(partitionDirs, defaultPartitionName)
} else {
// No partition directories found, makes an empty specification
@@ -241,20 +287,22 @@ case class ParquetRelation2
parquetSchema = maybeSchema.getOrElse(readSchema())
- dataSchemaIncludesPartitionKeys =
+ partitionKeysIncludedInParquetSchema =
isPartitioned &&
- partitionColumns.forall(f => metadataCache.parquetSchema.fieldNames.contains(f.name))
+ partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name))
schema = {
- val fullParquetSchema = if (dataSchemaIncludesPartitionKeys) {
- metadataCache.parquetSchema
+ val fullRelationSchema = if (partitionKeysIncludedInParquetSchema) {
+ parquetSchema
} else {
- StructType(metadataCache.parquetSchema.fields ++ partitionColumns.fields)
+ StructType(parquetSchema.fields ++ partitionColumns.fields)
}
+ // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
+ // insensitivity issue and possible schema mismatch.
maybeMetastoreSchema
- .map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullParquetSchema))
- .getOrElse(fullParquetSchema)
+ .map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullRelationSchema))
+ .getOrElse(fullRelationSchema)
}
}
@@ -303,13 +351,17 @@ case class ParquetRelation2
@transient private val metadataCache = new MetadataCache
metadataCache.refresh()
- private def partitionColumns = metadataCache.partitionSpec.partitionColumns
+ def partitionSpec = metadataCache.partitionSpec
- private def partitions = metadataCache.partitionSpec.partitions
+ def partitionColumns = metadataCache.partitionSpec.partitionColumns
- private def isPartitioned = partitionColumns.nonEmpty
+ def partitions = metadataCache.partitionSpec.partitions
- private def dataSchemaIncludesPartitionKeys = metadataCache.dataSchemaIncludesPartitionKeys
+ def isPartitioned = partitionColumns.nonEmpty
+
+ private def partitionKeysIncludedInDataSchema = metadataCache.partitionKeysIncludedInParquetSchema
+
+ private def parquetSchema = metadataCache.parquetSchema
override def schema = metadataCache.schema
@@ -412,18 +464,21 @@ case class ParquetRelation2
// When the data does not include the key and the key is requested then we must fill it in
// based on information from the input split.
- if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
+ if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) {
baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
val partValues = selectedPartitions.collectFirst {
case p if split.getPath.getParent.toString == p.path => p.values
}.get
+ val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
+
iterator.map { pair =>
val row = pair._2.asInstanceOf[SpecificMutableRow]
var i = 0
- while (i < partValues.size) {
+ while (i < requiredPartOrdinal.size) {
// TODO Avoids boxing cost here!
- row.update(partitionKeyLocations(i), partValues(i))
+ val partOrdinal = requiredPartOrdinal(i)
+ row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
i += 1
}
row
@@ -457,6 +512,8 @@ case class ParquetRelation2
}
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ assert(paths.size == 1, s"Can't write to multiple destinations: ${paths.mkString(",")}")
+
// TODO: currently we do not check whether the "schema"s are compatible
// That means if one first creates a table and then INSERTs data with
// and incompatible schema the execution will fail. It would be nice
@@ -464,7 +521,7 @@ case class ParquetRelation2
// before calling execute().
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val writeSupport = if (schema.map(_.dataType).forall(_.isPrimitive)) {
+ val writeSupport = if (parquetSchema.map(_.dataType).forall(_.isPrimitive)) {
log.debug("Initializing MutableRowWriteSupport")
classOf[MutableRowWriteSupport]
} else {
@@ -474,7 +531,7 @@ case class ParquetRelation2
ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
val conf = ContextUtil.getConfiguration(job)
- RowWriteSupport.setSchema(schema.toAttributes, conf)
+ RowWriteSupport.setSchema(data.schema.toAttributes, conf)
val destinationPath = new Path(paths.head)
@@ -544,14 +601,12 @@ object ParquetRelation2 {
// Whether we should merge schemas collected from all Parquet part-files.
val MERGE_SCHEMA = "mergeSchema"
- // Hive Metastore schema, passed in when the Parquet relation is converted from Metastore
- val METASTORE_SCHEMA = "metastoreSchema"
-
- // Default partition name to use when the partition column value is null or empty string
+ // Default partition name to use when the partition column value is null or empty string.
val DEFAULT_PARTITION_NAME = "partition.defaultName"
- // When true, the Parquet data source caches Parquet metadata for performance
- val CACHE_METADATA = "cacheMetadata"
+ // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used
+ // internally.
+ private[sql] val METASTORE_SCHEMA = "metastoreSchema"
private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
footers.map { footer =>
@@ -579,6 +634,15 @@ object ParquetRelation2 {
}
}
+ /**
+ * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
+ * schema and Parquet schema.
+ *
+ * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
+ * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
+ * distinguish binary and string). This method generates a correct schema by merging Metastore
+ * schema data types and Parquet schema field names.
+ */
private[parquet] def mergeMetastoreParquetSchema(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
@@ -719,16 +783,15 @@ object ParquetRelation2 {
* }}}
*/
private[parquet] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
- val distinctColNamesOfPartitions = values.map(_.columnNames).distinct
- val columnCount = values.head.columnNames.size
-
// Column names of all partitions must match
- assert(distinctColNamesOfPartitions.size == 1, {
- val list = distinctColNamesOfPartitions.mkString("\t", "\n", "")
+ val distinctPartitionsColNames = values.map(_.columnNames).distinct
+ assert(distinctPartitionsColNames.size == 1, {
+ val list = distinctPartitionsColNames.mkString("\t", "\n", "")
s"Conflicting partition column names detected:\n$list"
})
// Resolves possible type conflicts for each column
+ val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index f8117c2177..eb2d5f2529 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.parquet
+import org.scalatest.BeforeAndAfterAll
import parquet.filter2.predicate.Operators._
import parquet.filter2.predicate.{FilterPredicate, Operators}
@@ -40,7 +41,7 @@ import org.apache.spark.sql.{Column, DataFrame, QueryTest, SQLConf}
* 2. `Tuple1(Option(x))` is used together with `AnyVal` types like `Int` to ensure the inferred
* data type is nullable.
*/
-class ParquetFilterSuite extends QueryTest with ParquetTest {
+class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
val sqlContext = TestSQLContext
private def checkFilterPredicate(
@@ -112,210 +113,224 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
}
- def run(prefix: String): Unit = {
- test(s"$prefix: filter pushdown - boolean") {
- withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit rdd =>
- checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false)))
+ test("filter pushdown - boolean") {
+ withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false)))
- checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
- checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
- }
+ checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
+ checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
}
+ }
- test(s"$prefix: filter pushdown - short") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit rdd =>
- checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1)
- checkFilterPredicate(
- Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
-
- checkFilterPredicate(Cast('_1, IntegerType) < 2, classOf[Lt[_]], 1)
- checkFilterPredicate(Cast('_1, IntegerType) > 3, classOf[Gt[_]], 4)
- checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1)
- checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4)
-
- checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), classOf[Eq[_]], 1)
- checkFilterPredicate(Literal(2) > Cast('_1, IntegerType), classOf[Lt[_]], 1)
- checkFilterPredicate(Literal(3) < Cast('_1, IntegerType), classOf[Gt[_]], 4)
- checkFilterPredicate(Literal(1) >= Cast('_1, IntegerType), classOf[LtEq[_]], 1)
- checkFilterPredicate(Literal(4) <= Cast('_1, IntegerType), classOf[GtEq[_]], 4)
-
- checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate(
- Cast('_1, IntegerType) > 2 && Cast('_1, IntegerType) < 4, classOf[Operators.And], 3)
- checkFilterPredicate(
- Cast('_1, IntegerType) < 2 || Cast('_1, IntegerType) > 3,
- classOf[Operators.Or],
- Seq(Row(1), Row(4)))
- }
+ test("filter pushdown - short") {
+ withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit rdd =>
+ checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate(
+ Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+
+ checkFilterPredicate(Cast('_1, IntegerType) < 2, classOf[Lt[_]], 1)
+ checkFilterPredicate(Cast('_1, IntegerType) > 3, classOf[Gt[_]], 4)
+ checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(2) > Cast('_1, IntegerType), classOf[Lt[_]], 1)
+ checkFilterPredicate(Literal(3) < Cast('_1, IntegerType), classOf[Gt[_]], 4)
+ checkFilterPredicate(Literal(1) >= Cast('_1, IntegerType), classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= Cast('_1, IntegerType), classOf[GtEq[_]], 4)
+
+ checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate(
+ Cast('_1, IntegerType) > 2 && Cast('_1, IntegerType) < 4, classOf[Operators.And], 3)
+ checkFilterPredicate(
+ Cast('_1, IntegerType) < 2 || Cast('_1, IntegerType) > 3,
+ classOf[Operators.Or],
+ Seq(Row(1), Row(4)))
}
+ }
- test(s"$prefix: filter pushdown - integer") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd =>
- checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+ test("filter pushdown - integer") {
+ withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
- checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
- checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+ checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
- checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
- checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
- checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
- checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+ checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+ checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
- checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
- checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
- checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
- checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
- checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+ checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+ checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+ checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
- checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
- checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
- }
+ checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+ checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
+ }
- test(s"$prefix: filter pushdown - long") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit rdd =>
- checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+ test("filter pushdown - long") {
+ withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
- checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
- checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+ checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
- checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
- checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
- checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
- checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+ checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+ checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
- checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
- checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
- checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
- checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
- checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+ checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+ checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+ checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
- checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
- checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
- }
+ checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+ checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
+ }
- test(s"$prefix: filter pushdown - float") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit rdd =>
- checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+ test("filter pushdown - float") {
+ withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
- checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
- checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+ checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
- checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
- checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
- checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
- checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+ checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+ checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
- checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
- checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
- checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
- checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
- checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+ checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+ checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+ checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
- checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
- checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
- }
+ checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+ checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
+ }
- test(s"$prefix: filter pushdown - double") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit rdd =>
- checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
+ test("filter pushdown - double") {
+ withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
- checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
- checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
+ checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+ checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
- checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
- checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
- checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
- checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+ checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+ checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+ checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
- checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
- checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
- checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
- checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
- checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+ checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+ checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+ checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+ checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+ checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
- checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
- checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
- checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
- }
+ checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+ checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+ checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
}
+ }
- test(s"$prefix: filter pushdown - string") {
- withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { implicit rdd =>
- checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate(
- '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString)))
-
- checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1")
- checkFilterPredicate(
- '_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString)))
-
- checkFilterPredicate('_1 < "2", classOf[Lt[_]], "1")
- checkFilterPredicate('_1 > "3", classOf[Gt[_]], "4")
- checkFilterPredicate('_1 <= "1", classOf[LtEq[_]], "1")
- checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
-
- checkFilterPredicate(Literal("1") === '_1, classOf[Eq[_]], "1")
- checkFilterPredicate(Literal("2") > '_1, classOf[Lt[_]], "1")
- checkFilterPredicate(Literal("3") < '_1, classOf[Gt[_]], "4")
- checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1")
- checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4")
-
- checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
- checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3")
- checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4")))
- }
+ test("filter pushdown - string") {
+ withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { implicit rdd =>
+ checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(
+ '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString)))
+
+ checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1")
+ checkFilterPredicate(
+ '_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString)))
+
+ checkFilterPredicate('_1 < "2", classOf[Lt[_]], "1")
+ checkFilterPredicate('_1 > "3", classOf[Gt[_]], "4")
+ checkFilterPredicate('_1 <= "1", classOf[LtEq[_]], "1")
+ checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
+
+ checkFilterPredicate(Literal("1") === '_1, classOf[Eq[_]], "1")
+ checkFilterPredicate(Literal("2") > '_1, classOf[Lt[_]], "1")
+ checkFilterPredicate(Literal("3") < '_1, classOf[Gt[_]], "4")
+ checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1")
+ checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4")
+
+ checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
+ checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3")
+ checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4")))
}
+ }
- test(s"$prefix: filter pushdown - binary") {
- implicit class IntToBinary(int: Int) {
- def b: Array[Byte] = int.toString.getBytes("UTF-8")
- }
+ test("filter pushdown - binary") {
+ implicit class IntToBinary(int: Int) {
+ def b: Array[Byte] = int.toString.getBytes("UTF-8")
+ }
- withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { implicit rdd =>
- checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b)
+ withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { implicit rdd =>
+ checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b)
- checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkBinaryFilterPredicate(
- '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.b)).toSeq)
+ checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkBinaryFilterPredicate(
+ '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.b)).toSeq)
- checkBinaryFilterPredicate(
- '_1 !== 1.b, classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.b)).toSeq)
+ checkBinaryFilterPredicate(
+ '_1 !== 1.b, classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.b)).toSeq)
- checkBinaryFilterPredicate('_1 < 2.b, classOf[Lt[_]], 1.b)
- checkBinaryFilterPredicate('_1 > 3.b, classOf[Gt[_]], 4.b)
- checkBinaryFilterPredicate('_1 <= 1.b, classOf[LtEq[_]], 1.b)
- checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
+ checkBinaryFilterPredicate('_1 < 2.b, classOf[Lt[_]], 1.b)
+ checkBinaryFilterPredicate('_1 > 3.b, classOf[Gt[_]], 4.b)
+ checkBinaryFilterPredicate('_1 <= 1.b, classOf[LtEq[_]], 1.b)
+ checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
- checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq[_]], 1.b)
- checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt[_]], 1.b)
- checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt[_]], 4.b)
- checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b)
- checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b)
+ checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq[_]], 1.b)
+ checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt[_]], 1.b)
+ checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt[_]], 4.b)
+ checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b)
+ checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b)
- checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
- checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, classOf[Operators.And], 3.b)
- checkBinaryFilterPredicate(
- '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
- }
+ checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
+ checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, classOf[Operators.And], 3.b)
+ checkBinaryFilterPredicate(
+ '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
}
}
+}
+
+class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeAndAfterAll {
+ val originalConf = sqlContext.conf.parquetUseDataSourceApi
+
+ override protected def beforeAll(): Unit = {
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ }
+
+ override protected def afterAll(): Unit = {
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ }
+}
+
+class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with BeforeAndAfterAll {
+ val originalConf = sqlContext.conf.parquetUseDataSourceApi
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
- run("Parquet data source enabled")
+ override protected def beforeAll(): Unit = {
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
}
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") {
- run("Parquet data source disabled")
+ override protected def afterAll(): Unit = {
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index c306330818..208f35761b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -21,6 +21,9 @@ import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.scalatest.BeforeAndAfterAll
import parquet.example.data.simple.SimpleGroup
import parquet.example.data.{Group, GroupWriter}
import parquet.hadoop.api.WriteSupport
@@ -30,16 +33,13 @@ import parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import parquet.io.api.RecordConsumer
import parquet.schema.{MessageType, MessageTypeParser}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf}
-import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
// with an empty configuration (it is after all not intended to be used in this way?)
@@ -64,10 +64,11 @@ private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteS
/**
* A test suite that tests basic Parquet I/O.
*/
-class ParquetIOSuite extends QueryTest with ParquetTest {
-
+class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val sqlContext = TestSQLContext
+ import sqlContext.implicits.localSeqToDataFrameHolder
+
/**
* Writes `data` to a Parquet file, reads it back and check file contents.
*/
@@ -75,229 +76,281 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
withParquetRDD(data)(r => checkAnswer(r, data.map(Row.fromTuple)))
}
- def run(prefix: String): Unit = {
- test(s"$prefix: basic data types (without binary)") {
- val data = (1 to 4).map { i =>
- (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
- }
- checkParquetFile(data)
+ test("basic data types (without binary)") {
+ val data = (1 to 4).map { i =>
+ (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
}
+ checkParquetFile(data)
+ }
- test(s"$prefix: raw binary") {
- val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
- withParquetRDD(data) { rdd =>
- assertResult(data.map(_._1.mkString(",")).sorted) {
- rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
- }
+ test("raw binary") {
+ val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
+ withParquetRDD(data) { rdd =>
+ assertResult(data.map(_._1.mkString(",")).sorted) {
+ rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
}
}
+ }
- test(s"$prefix: string") {
- val data = (1 to 4).map(i => Tuple1(i.toString))
- // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
- // as we store Spark SQL schema in the extra metadata.
- withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "false")(checkParquetFile(data))
- withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "true")(checkParquetFile(data))
- }
+ test("string") {
+ val data = (1 to 4).map(i => Tuple1(i.toString))
+ // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
+ // as we store Spark SQL schema in the extra metadata.
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "false")(checkParquetFile(data))
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "true")(checkParquetFile(data))
+ }
- test(s"$prefix: fixed-length decimals") {
-
- def makeDecimalRDD(decimal: DecimalType): DataFrame =
- sparkContext
- .parallelize(0 to 1000)
- .map(i => Tuple1(i / 100.0))
- .toDF
- // Parquet doesn't allow column names with spaces, have to add an alias here
- .select($"_1" cast decimal as "dec")
-
- for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
- withTempPath { dir =>
- val data = makeDecimalRDD(DecimalType(precision, scale))
- data.saveAsParquetFile(dir.getCanonicalPath)
- checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
- }
+ test("fixed-length decimals") {
+
+ def makeDecimalRDD(decimal: DecimalType): DataFrame =
+ sparkContext
+ .parallelize(0 to 1000)
+ .map(i => Tuple1(i / 100.0))
+ .toDF
+ // Parquet doesn't allow column names with spaces, have to add an alias here
+ .select($"_1" cast decimal as "dec")
+
+ for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
+ withTempPath { dir =>
+ val data = makeDecimalRDD(DecimalType(precision, scale))
+ data.saveAsParquetFile(dir.getCanonicalPath)
+ checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
}
+ }
- // Decimals with precision above 18 are not yet supported
- intercept[RuntimeException] {
- withTempPath { dir =>
- makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
- parquetFile(dir.getCanonicalPath).collect()
- }
+ // Decimals with precision above 18 are not yet supported
+ intercept[RuntimeException] {
+ withTempPath { dir =>
+ makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
+ parquetFile(dir.getCanonicalPath).collect()
}
+ }
- // Unlimited-length decimals are not yet supported
- intercept[RuntimeException] {
- withTempPath { dir =>
- makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
- parquetFile(dir.getCanonicalPath).collect()
- }
+ // Unlimited-length decimals are not yet supported
+ intercept[RuntimeException] {
+ withTempPath { dir =>
+ makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
+ parquetFile(dir.getCanonicalPath).collect()
}
}
+ }
+
+ test("map") {
+ val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
+ checkParquetFile(data)
+ }
+
+ test("array") {
+ val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
+ checkParquetFile(data)
+ }
- test(s"$prefix: map") {
- val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
- checkParquetFile(data)
+ test("struct") {
+ val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
+ withParquetRDD(data) { rdd =>
+ // Structs are converted to `Row`s
+ checkAnswer(rdd, data.map { case Tuple1(struct) =>
+ Row(Row(struct.productIterator.toSeq: _*))
+ })
}
+ }
- test(s"$prefix: array") {
- val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
- checkParquetFile(data)
+ test("nested struct with array of array as field") {
+ val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
+ withParquetRDD(data) { rdd =>
+ // Structs are converted to `Row`s
+ checkAnswer(rdd, data.map { case Tuple1(struct) =>
+ Row(Row(struct.productIterator.toSeq: _*))
+ })
}
+ }
- test(s"$prefix: struct") {
- val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
- withParquetRDD(data) { rdd =>
- // Structs are converted to `Row`s
- checkAnswer(rdd, data.map { case Tuple1(struct) =>
- Row(Row(struct.productIterator.toSeq: _*))
- })
- }
+ test("nested map with struct as value type") {
+ val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
+ withParquetRDD(data) { rdd =>
+ checkAnswer(rdd, data.map { case Tuple1(m) =>
+ Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
+ })
}
+ }
- test(s"$prefix: nested struct with array of array as field") {
- val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
- withParquetRDD(data) { rdd =>
- // Structs are converted to `Row`s
- checkAnswer(rdd, data.map { case Tuple1(struct) =>
- Row(Row(struct.productIterator.toSeq: _*))
- })
- }
+ test("nulls") {
+ val allNulls = (
+ null.asInstanceOf[java.lang.Boolean],
+ null.asInstanceOf[Integer],
+ null.asInstanceOf[java.lang.Long],
+ null.asInstanceOf[java.lang.Float],
+ null.asInstanceOf[java.lang.Double])
+
+ withParquetRDD(allNulls :: Nil) { rdd =>
+ val rows = rdd.collect()
+ assert(rows.size === 1)
+ assert(rows.head === Row(Seq.fill(5)(null): _*))
}
+ }
- test(s"$prefix: nested map with struct as value type") {
- val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
- withParquetRDD(data) { rdd =>
- checkAnswer(rdd, data.map { case Tuple1(m) =>
- Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
- })
- }
+ test("nones") {
+ val allNones = (
+ None.asInstanceOf[Option[Int]],
+ None.asInstanceOf[Option[Long]],
+ None.asInstanceOf[Option[String]])
+
+ withParquetRDD(allNones :: Nil) { rdd =>
+ val rows = rdd.collect()
+ assert(rows.size === 1)
+ assert(rows.head === Row(Seq.fill(3)(null): _*))
}
+ }
- test(s"$prefix: nulls") {
- val allNulls = (
- null.asInstanceOf[java.lang.Boolean],
- null.asInstanceOf[Integer],
- null.asInstanceOf[java.lang.Long],
- null.asInstanceOf[java.lang.Float],
- null.asInstanceOf[java.lang.Double])
-
- withParquetRDD(allNulls :: Nil) { rdd =>
- val rows = rdd.collect()
- assert(rows.size === 1)
- assert(rows.head === Row(Seq.fill(5)(null): _*))
- }
+ test("compression codec") {
+ def compressionCodecFor(path: String) = {
+ val codecs = ParquetTypesConverter
+ .readMetaData(new Path(path), Some(configuration))
+ .getBlocks
+ .flatMap(_.getColumns)
+ .map(_.getCodec.name())
+ .distinct
+
+ assert(codecs.size === 1)
+ codecs.head
}
- test(s"$prefix: nones") {
- val allNones = (
- None.asInstanceOf[Option[Int]],
- None.asInstanceOf[Option[Long]],
- None.asInstanceOf[Option[String]])
+ val data = (0 until 10).map(i => (i, i.toString))
- withParquetRDD(allNones :: Nil) { rdd =>
- val rows = rdd.collect()
- assert(rows.size === 1)
- assert(rows.head === Row(Seq.fill(3)(null): _*))
+ def checkCompressionCodec(codec: CompressionCodecName): Unit = {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
+ withParquetFile(data) { path =>
+ assertResult(conf.parquetCompressionCodec.toUpperCase) {
+ compressionCodecFor(path)
+ }
+ }
}
}
- test(s"$prefix: compression codec") {
- def compressionCodecFor(path: String) = {
- val codecs = ParquetTypesConverter
- .readMetaData(new Path(path), Some(configuration))
- .getBlocks
- .flatMap(_.getColumns)
- .map(_.getCodec.name())
- .distinct
-
- assert(codecs.size === 1)
- codecs.head
- }
+ // Checks default compression codec
+ checkCompressionCodec(CompressionCodecName.fromConf(conf.parquetCompressionCodec))
- val data = (0 until 10).map(i => (i, i.toString))
+ checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
+ checkCompressionCodec(CompressionCodecName.GZIP)
+ checkCompressionCodec(CompressionCodecName.SNAPPY)
+ }
- def checkCompressionCodec(codec: CompressionCodecName): Unit = {
- withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
- withParquetFile(data) { path =>
- assertResult(conf.parquetCompressionCodec.toUpperCase) {
- compressionCodecFor(path)
- }
- }
- }
+ test("read raw Parquet file") {
+ def makeRawParquetFile(path: Path): Unit = {
+ val schema = MessageTypeParser.parseMessageType(
+ """
+ |message root {
+ | required boolean _1;
+ | required int32 _2;
+ | required int64 _3;
+ | required float _4;
+ | required double _5;
+ |}
+ """.stripMargin)
+
+ val writeSupport = new TestGroupWriteSupport(schema)
+ val writer = new ParquetWriter[Group](path, writeSupport)
+
+ (0 until 10).foreach { i =>
+ val record = new SimpleGroup(schema)
+ record.add(0, i % 2 == 0)
+ record.add(1, i)
+ record.add(2, i.toLong)
+ record.add(3, i.toFloat)
+ record.add(4, i.toDouble)
+ writer.write(record)
}
- // Checks default compression codec
- checkCompressionCodec(CompressionCodecName.fromConf(conf.parquetCompressionCodec))
+ writer.close()
+ }
- checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
- checkCompressionCodec(CompressionCodecName.GZIP)
- checkCompressionCodec(CompressionCodecName.SNAPPY)
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+ makeRawParquetFile(path)
+ checkAnswer(parquetFile(path.toString), (0 until 10).map { i =>
+ Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+ })
}
+ }
- test(s"$prefix: read raw Parquet file") {
- def makeRawParquetFile(path: Path): Unit = {
- val schema = MessageTypeParser.parseMessageType(
- """
- |message root {
- | required boolean _1;
- | required int32 _2;
- | required int64 _3;
- | required float _4;
- | required double _5;
- |}
- """.stripMargin)
-
- val writeSupport = new TestGroupWriteSupport(schema)
- val writer = new ParquetWriter[Group](path, writeSupport)
-
- (0 until 10).foreach { i =>
- val record = new SimpleGroup(schema)
- record.add(0, i % 2 == 0)
- record.add(1, i)
- record.add(2, i.toLong)
- record.add(3, i.toFloat)
- record.add(4, i.toDouble)
- writer.write(record)
- }
+ test("write metadata") {
+ withTempPath { file =>
+ val path = new Path(file.toURI.toString)
+ val fs = FileSystem.getLocal(configuration)
+ val attributes = ScalaReflection.attributesFor[(Int, String)]
+ ParquetTypesConverter.writeMetaData(attributes, path, configuration)
- writer.close()
- }
+ assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
+ assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
- withTempDir { dir =>
- val path = new Path(dir.toURI.toString, "part-r-0.parquet")
- makeRawParquetFile(path)
- checkAnswer(parquetFile(path.toString), (0 until 10).map { i =>
- Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
- })
- }
+ val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration))
+ val actualSchema = metaData.getFileMetaData.getSchema
+ val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes)
+
+ actualSchema.checkContains(expectedSchema)
+ expectedSchema.checkContains(actualSchema)
}
+ }
- test(s"$prefix: write metadata") {
- withTempPath { file =>
- val path = new Path(file.toURI.toString)
- val fs = FileSystem.getLocal(configuration)
- val attributes = ScalaReflection.attributesFor[(Int, String)]
- ParquetTypesConverter.writeMetaData(attributes, path, configuration)
+ test("save - overwrite") {
+ withParquetFile((1 to 10).map(i => (i, i.toString))) { file =>
+ val newData = (11 to 20).map(i => (i, i.toString))
+ newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Overwrite, Map("path" -> file))
+ checkAnswer(parquetFile(file), newData.map(Row.fromTuple))
+ }
+ }
- assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
- assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
+ test("save - ignore") {
+ val data = (1 to 10).map(i => (i, i.toString))
+ withParquetFile(data) { file =>
+ val newData = (11 to 20).map(i => (i, i.toString))
+ newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Ignore, Map("path" -> file))
+ checkAnswer(parquetFile(file), data.map(Row.fromTuple))
+ }
+ }
- val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration))
- val actualSchema = metaData.getFileMetaData.getSchema
- val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes)
+ test("save - throw") {
+ val data = (1 to 10).map(i => (i, i.toString))
+ withParquetFile(data) { file =>
+ val newData = (11 to 20).map(i => (i, i.toString))
+ val errorMessage = intercept[Throwable] {
+ newData.toDF().save(
+ "org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> file))
+ }.getMessage
+ assert(errorMessage.contains("already exists"))
+ }
+ }
- actualSchema.checkContains(expectedSchema)
- expectedSchema.checkContains(actualSchema)
- }
+ test("save - append") {
+ val data = (1 to 10).map(i => (i, i.toString))
+ withParquetFile(data) { file =>
+ val newData = (11 to 20).map(i => (i, i.toString))
+ newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Append, Map("path" -> file))
+ checkAnswer(parquetFile(file), (data ++ newData).map(Row.fromTuple))
}
}
+}
+
+class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
+ val originalConf = sqlContext.conf.parquetUseDataSourceApi
+
+ override protected def beforeAll(): Unit = {
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ }
+
+ override protected def afterAll(): Unit = {
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ }
+}
+
+class ParquetDataSourceOffIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
+ val originalConf = sqlContext.conf.parquetUseDataSourceApi
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
- run("Parquet data source enabled")
+ override protected def beforeAll(): Unit = {
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
}
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") {
- run("Parquet data source disabled")
+ override protected def afterAll(): Unit = {
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index ae606d11a8..3bf0116c8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -19,17 +19,24 @@ package org.apache.spark.sql.parquet
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.fs.Path
-import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.parquet.ParquetRelation2._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{QueryTest, Row, SQLContext}
-class ParquetPartitionDiscoverySuite extends FunSuite with ParquetTest {
+// The data where the partitioning key exists only in the directory structure.
+case class ParquetData(intField: Int, stringField: String)
+
+// The data that also includes the partitioning key
+case class ParquetDataWithKey(intField: Int, pi: Int, stringField: String, ps: String)
+
+class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
override val sqlContext: SQLContext = TestSQLContext
+ import sqlContext._
+
val defaultPartitionName = "__NULL__"
test("column type inference") {
@@ -113,6 +120,17 @@ class ParquetPartitionDiscoverySuite extends FunSuite with ParquetTest {
Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
check(Seq(
+ s"hdfs://host:9000/path/a=10/b=20",
+ s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
+ PartitionSpec(
+ StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StringType))),
+ Seq(
+ Partition(Row(10, "20"), s"hdfs://host:9000/path/a=10/b=20"),
+ Partition(Row(null, "hello"), s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"))))
+
+ check(Seq(
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName",
s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"),
PartitionSpec(
@@ -123,4 +141,182 @@ class ParquetPartitionDiscoverySuite extends FunSuite with ParquetTest {
Partition(Row(10, null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
Partition(Row(10.5, null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
}
+
+ test("read partitioned table - normal case") {
+ withTempDir { base =>
+ for {
+ pi <- Seq(1, 2)
+ ps <- Seq("foo", "bar")
+ } {
+ makeParquetFile(
+ (1 to 10).map(i => ParquetData(i, i.toString)),
+ makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+ }
+
+ parquetFile(base.getCanonicalPath).registerTempTable("t")
+
+ withTempTable("t") {
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ ps <- Seq("foo", "bar")
+ } yield Row(i, i.toString, pi, ps))
+
+ checkAnswer(
+ sql("SELECT intField, pi FROM t"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ _ <- Seq("foo", "bar")
+ } yield Row(i, pi))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE pi = 1"),
+ for {
+ i <- 1 to 10
+ ps <- Seq("foo", "bar")
+ } yield Row(i, i.toString, 1, ps))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE ps = 'foo'"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ } yield Row(i, i.toString, pi, "foo"))
+ }
+ }
+ }
+
+ test("read partitioned table - partition key included in Parquet file") {
+ withTempDir { base =>
+ for {
+ pi <- Seq(1, 2)
+ ps <- Seq("foo", "bar")
+ } {
+ makeParquetFile(
+ (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)),
+ makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+ }
+
+ parquetFile(base.getCanonicalPath).registerTempTable("t")
+
+ withTempTable("t") {
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ ps <- Seq("foo", "bar")
+ } yield Row(i, pi, i.toString, ps))
+
+ checkAnswer(
+ sql("SELECT intField, pi FROM t"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ _ <- Seq("foo", "bar")
+ } yield Row(i, pi))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE pi = 1"),
+ for {
+ i <- 1 to 10
+ ps <- Seq("foo", "bar")
+ } yield Row(i, 1, i.toString, ps))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE ps = 'foo'"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ } yield Row(i, pi, i.toString, "foo"))
+ }
+ }
+ }
+
+ test("read partitioned table - with nulls") {
+ withTempDir { base =>
+ for {
+ // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero...
+ pi <- Seq(1, null.asInstanceOf[Integer])
+ ps <- Seq("foo", null.asInstanceOf[String])
+ } {
+ makeParquetFile(
+ (1 to 10).map(i => ParquetData(i, i.toString)),
+ makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+ }
+
+ val parquetRelation = load(
+ "org.apache.spark.sql.parquet",
+ Map(
+ "path" -> base.getCanonicalPath,
+ ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName))
+
+ parquetRelation.registerTempTable("t")
+
+ withTempTable("t") {
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, null.asInstanceOf[Integer])
+ ps <- Seq("foo", null.asInstanceOf[String])
+ } yield Row(i, i.toString, pi, ps))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE pi IS NULL"),
+ for {
+ i <- 1 to 10
+ ps <- Seq("foo", null.asInstanceOf[String])
+ } yield Row(i, i.toString, null, ps))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE ps IS NULL"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, null.asInstanceOf[Integer])
+ } yield Row(i, i.toString, pi, null))
+ }
+ }
+ }
+
+ test("read partitioned table - with nulls and partition keys are included in Parquet file") {
+ withTempDir { base =>
+ for {
+ pi <- Seq(1, 2)
+ ps <- Seq("foo", null.asInstanceOf[String])
+ } {
+ makeParquetFile(
+ (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)),
+ makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+ }
+
+ val parquetRelation = load(
+ "org.apache.spark.sql.parquet",
+ Map(
+ "path" -> base.getCanonicalPath,
+ ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName))
+
+ parquetRelation.registerTempTable("t")
+
+ withTempTable("t") {
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ ps <- Seq("foo", null.asInstanceOf[String])
+ } yield Row(i, pi, i.toString, ps))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE ps IS NULL"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ } yield Row(i, pi, i.toString, null))
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index cba06835f9..d0665450cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,103 +17,120 @@
package org.apache.spark.sql.parquet
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
-import org.apache.spark.sql.{QueryTest, SQLConf}
/**
* A test suite that tests various Parquet queries.
*/
-class ParquetQuerySuite extends QueryTest with ParquetTest {
+class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
val sqlContext = TestSQLContext
- def run(prefix: String): Unit = {
- test(s"$prefix: simple projection") {
- withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
- checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
- }
+ test("simple projection") {
+ withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
+ checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
}
+ }
- test(s"$prefix: appending") {
- val data = (0 until 10).map(i => (i, i.toString))
- withParquetTable(data, "t") {
- sql("INSERT INTO TABLE t SELECT * FROM t")
- checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
- }
+ test("appending") {
+ val data = (0 until 10).map(i => (i, i.toString))
+ withParquetTable(data, "t") {
+ sql("INSERT INTO TABLE t SELECT * FROM t")
+ checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
+ }
- // This test case will trigger the NPE mentioned in
- // https://issues.apache.org/jira/browse/PARQUET-151.
- ignore(s"$prefix: overwriting") {
- val data = (0 until 10).map(i => (i, i.toString))
- withParquetTable(data, "t") {
- sql("INSERT OVERWRITE TABLE t SELECT * FROM t")
- checkAnswer(table("t"), data.map(Row.fromTuple))
- }
+ // This test case will trigger the NPE mentioned in
+ // https://issues.apache.org/jira/browse/PARQUET-151.
+ // Update: This also triggers SPARK-5746, should re enable it when we get both fixed.
+ ignore("overwriting") {
+ val data = (0 until 10).map(i => (i, i.toString))
+ withParquetTable(data, "t") {
+ sql("INSERT OVERWRITE TABLE t SELECT * FROM t")
+ checkAnswer(table("t"), data.map(Row.fromTuple))
}
+ }
- test(s"$prefix: self-join") {
- // 4 rows, cells of column 1 of row 2 and row 4 are null
- val data = (1 to 4).map { i =>
- val maybeInt = if (i % 2 == 0) None else Some(i)
- (maybeInt, i.toString)
- }
-
- withParquetTable(data, "t") {
- val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
- val queryOutput = selfJoin.queryExecution.analyzed.output
+ test("self-join") {
+ // 4 rows, cells of column 1 of row 2 and row 4 are null
+ val data = (1 to 4).map { i =>
+ val maybeInt = if (i % 2 == 0) None else Some(i)
+ (maybeInt, i.toString)
+ }
- assertResult(4, s"Field count mismatches")(queryOutput.size)
- assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
- queryOutput.filter(_.name == "_1").map(_.exprId).size
- }
+ withParquetTable(data, "t") {
+ val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
+ val queryOutput = selfJoin.queryExecution.analyzed.output
- checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
+ assertResult(4, "Field count mismatche")(queryOutput.size)
+ assertResult(2, "Duplicated expression ID in query plan:\n $selfJoin") {
+ queryOutput.filter(_.name == "_1").map(_.exprId).size
}
+
+ checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
}
+ }
- test(s"$prefix: nested data - struct with array field") {
- val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
- withParquetTable(data, "t") {
- checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
- case Tuple1((_, Seq(string))) => Row(string)
- })
- }
+ test("nested data - struct with array field") {
+ val data = (1 to 10).map(i => Tuple1((i, Seq("val_$i"))))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
+ case Tuple1((_, Seq(string))) => Row(string)
+ })
}
+ }
- test(s"$prefix: nested data - array of struct") {
- val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
- withParquetTable(data, "t") {
- checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
- case Tuple1(Seq((_, string))) => Row(string)
- })
- }
+ test("nested data - array of struct") {
+ val data = (1 to 10).map(i => Tuple1(Seq(i -> "val_$i")))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
+ case Tuple1(Seq((_, string))) => Row(string)
+ })
}
+ }
- test(s"$prefix: SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
- withParquetTable((1 to 10).map(Tuple1.apply), "t") {
- checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
- }
+ test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
+ withParquetTable((1 to 10).map(Tuple1.apply), "t") {
+ checkAnswer(sql("SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
}
+ }
- test(s"$prefix: SPARK-5309 strings stored using dictionary compression in parquet") {
- withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), "t") {
+ test("SPARK-5309 strings stored using dictionary compression in parquet") {
+ withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), "t") {
- checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"),
- (0 until 10).map(i => Row("same", "run_" + i, 100)))
+ checkAnswer(sql("SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"),
+ (0 until 10).map(i => Row("same", "run_" + i, 100)))
- checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP BY _1, _2"),
- List(Row("same", "run_5", 100)))
- }
+ checkAnswer(sql("SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP BY _1, _2"),
+ List(Row("same", "run_5", 100)))
}
}
+}
+
+class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAndAfterAll {
+ val originalConf = sqlContext.conf.parquetUseDataSourceApi
+
+ override protected def beforeAll(): Unit = {
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ }
+
+ override protected def afterAll(): Unit = {
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ }
+}
+
+class ParquetDataSourceOffQuerySuite extends ParquetQuerySuiteBase with BeforeAndAfterAll {
+ val originalConf = sqlContext.conf.parquetUseDataSourceApi
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
- run("Parquet data source enabled")
+ override protected def beforeAll(): Unit = {
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
}
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") {
- run("Parquet data source disabled")
+ override protected def afterAll(): Unit = {
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
}