aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2015-11-11 16:46:04 +0800
committerCheng Lian <lian@databricks.com>2015-11-11 16:46:04 +0800
commit1bc41125ee6306e627be212969854f639969c440 (patch)
treef2ddd4fdd429ed284f508086848dc1cba0f8be1c /sql
parent99f5f988612b3093d73d9ce98819767e822fcbff (diff)
downloadspark-1bc41125ee6306e627be212969854f639969c440.tar.gz
spark-1bc41125ee6306e627be212969854f639969c440.tar.bz2
spark-1bc41125ee6306e627be212969854f639969c440.zip
[SPARK-11500][SQL] Not deterministic order of columns when using merging schemas.
https://issues.apache.org/jira/browse/SPARK-11500 As filed in SPARK-11500, if merging schemas is enabled, the order of files to touch is a matter which might affect the ordering of the output columns. This was mostly because of the use of `Set` and `Map` so I replaced them to `LinkedHashSet` and `LinkedHashMap` to keep the insertion order. Also, I changed `reduceOption` to `reduceLeftOption`, and replaced the order of `filesToTouch` from `metadataStatuses ++ commonMetadataStatuses ++ needMerged` to `needMerged ++ metadataStatuses ++ commonMetadataStatuses` in order to touch the part-files first which always have the schema in footers whereas the others might not exist. One nit is, If merging schemas is not enabled, but when multiple files are given, there is no guarantee of the output order, since there might not be a summary file for the first file, which ends up putting ahead the columns of the other files. However, I thought this should be okay since disabling merging schemas means (assumes) all the files have the same schemas. In addition, in the test code for this, I only checked the names of fields. Author: hyukjinkwon <gurwls223@gmail.com> Closes #9517 from HyukjinKwon/SPARK-11500.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala22
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala21
3 files changed, 55 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 5a7c6b95b5..21337b2932 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -383,7 +383,7 @@ private[sql] class ParquetRelation(
var schema: StructType = _
// Cached leaves
- var cachedLeaves: Set[FileStatus] = null
+ var cachedLeaves: mutable.LinkedHashSet[FileStatus] = null
/**
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
@@ -396,13 +396,13 @@ private[sql] class ParquetRelation(
!cachedLeaves.equals(currentLeafStatuses)
if (leafStatusesChanged) {
- cachedLeaves = currentLeafStatuses.toIterator.toSet
+ cachedLeaves = currentLeafStatuses
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = currentLeafStatuses.filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
- }.toArray
+ }.toArray.sortBy(_.getPath.toString)
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses =
@@ -465,13 +465,30 @@ private[sql] class ParquetRelation(
// You should enable this configuration only if you are very sure that for the parquet
// part-files to read there are corresponding summary files containing correct schema.
+ // As filed in SPARK-11500, the order of files to touch is a matter, which might affect
+ // the ordering of the output columns. There are several things to mention here.
+ //
+ // 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
+ // the first part-file so that the columns of the lexicographically first file show
+ // first.
+ //
+ // 2. If mergeRespectSummaries config is true, then there should be, at least,
+ // "_metadata"s for all given files, so that we can ensure the columns of
+ // the lexicographically first file show first.
+ //
+ // 3. If shouldMergeSchemas is false, but when multiple files are given, there is
+ // no guarantee of the output order, since there might not be a summary file for the
+ // lexicographically first file, which ends up putting ahead the columns of
+ // the other files. However, this should be okay since not enabling
+ // shouldMergeSchemas means (assumes) all the files have the same schemas.
+
val needMerged: Seq[FileStatus] =
if (mergeRespectSummaries) {
Seq()
} else {
dataStatuses
}
- (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
+ needMerged ++ metadataStatuses ++ commonMetadataStatuses
} else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
@@ -768,10 +785,10 @@ private[sql] object ParquetRelation extends Logging {
footers.map { footer =>
ParquetRelation.readSchemaFromFooter(footer, converter)
- }.reduceOption(_ merge _).iterator
+ }.reduceLeftOption(_ merge _).iterator
}.collect()
- partiallyMergedSchemas.reduceOption(_ merge _)
+ partiallyMergedSchemas.reduceLeftOption(_ merge _)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index e296d631f0..5b8841bc15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -428,11 +428,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private var _partitionSpec: PartitionSpec = _
private class FileStatusCache {
- var leafFiles = mutable.Map.empty[Path, FileStatus]
+ var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
- private def listLeafFiles(paths: Array[String]): Set[FileStatus] = {
+ private def listLeafFiles(paths: Array[String]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
} else {
@@ -450,10 +450,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
val (dirs, files) = statuses.partition(_.isDir)
+ // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
if (dirs.isEmpty) {
- files.toSet
+ mutable.LinkedHashSet(files: _*)
} else {
- files.toSet ++ listLeafFiles(dirs.map(_.getPath.toString))
+ mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath.toString))
}
}
}
@@ -464,7 +465,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
leafFiles.clear()
leafDirToChildrenFiles.clear()
- leafFiles ++= files.map(f => f.getPath -> f).toMap
+ leafFiles ++= files.map(f => f.getPath -> f)
leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
}
}
@@ -475,8 +476,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
cache
}
- protected def cachedLeafStatuses(): Set[FileStatus] = {
- fileStatusCache.leafFiles.values.toSet
+ protected def cachedLeafStatuses(): mutable.LinkedHashSet[FileStatus] = {
+ mutable.LinkedHashSet(fileStatusCache.leafFiles.values.toArray: _*)
}
final private[sql] def partitionSpec: PartitionSpec = {
@@ -834,7 +835,7 @@ private[sql] object HadoopFsRelation extends Logging {
def listLeafFilesInParallel(
paths: Array[String],
hadoopConf: Configuration,
- sparkContext: SparkContext): Set[FileStatus] = {
+ sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -854,9 +855,10 @@ private[sql] object HadoopFsRelation extends Logging {
status.getAccessTime)
}.collect()
- fakeStatuses.map { f =>
+ val hadoopFakeStatuses = fakeStatuses.map { f =>
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path))
- }.toSet
+ }
+ mutable.LinkedHashSet(hadoopFakeStatuses: _*)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index e2d754e806..e866493ee6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{execution, AnalysisException, SaveMode}
+import org.apache.spark.sql._
import org.apache.spark.sql.types._
@@ -155,4 +155,23 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
}
}
+
+ test("SPARK-11500: Not deterministic order of columns when using merging schemas.") {
+ import testImplicits._
+ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val pathOne = s"${dir.getCanonicalPath}/part=1"
+ Seq(1, 1).zipWithIndex.toDF("a", "b").write.parquet(pathOne)
+ val pathTwo = s"${dir.getCanonicalPath}/part=2"
+ Seq(1, 1).zipWithIndex.toDF("c", "b").write.parquet(pathTwo)
+ val pathThree = s"${dir.getCanonicalPath}/part=3"
+ Seq(1, 1).zipWithIndex.toDF("d", "b").write.parquet(pathThree)
+
+ // The schema consists of the leading columns of the first part-file
+ // in the lexicographic order.
+ assert(sqlContext.read.parquet(dir.getCanonicalPath).schema.map(_.name)
+ === Seq("a", "b", "c", "d", "part"))
+ }
+ }
+ }
}