aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala22
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala21
3 files changed, 55 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 5a7c6b95b5..21337b2932 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -383,7 +383,7 @@ private[sql] class ParquetRelation(
var schema: StructType = _
// Cached leaves
- var cachedLeaves: Set[FileStatus] = null
+ var cachedLeaves: mutable.LinkedHashSet[FileStatus] = null
/**
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
@@ -396,13 +396,13 @@ private[sql] class ParquetRelation(
!cachedLeaves.equals(currentLeafStatuses)
if (leafStatusesChanged) {
- cachedLeaves = currentLeafStatuses.toIterator.toSet
+ cachedLeaves = currentLeafStatuses
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = currentLeafStatuses.filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
- }.toArray
+ }.toArray.sortBy(_.getPath.toString)
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses =
@@ -465,13 +465,30 @@ private[sql] class ParquetRelation(
// You should enable this configuration only if you are very sure that for the parquet
// part-files to read there are corresponding summary files containing correct schema.
+ // As filed in SPARK-11500, the order of files to touch is a matter, which might affect
+ // the ordering of the output columns. There are several things to mention here.
+ //
+ // 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
+ // the first part-file so that the columns of the lexicographically first file show
+ // first.
+ //
+ // 2. If mergeRespectSummaries config is true, then there should be, at least,
+ // "_metadata"s for all given files, so that we can ensure the columns of
+ // the lexicographically first file show first.
+ //
+ // 3. If shouldMergeSchemas is false, but when multiple files are given, there is
+ // no guarantee of the output order, since there might not be a summary file for the
+ // lexicographically first file, which ends up putting ahead the columns of
+ // the other files. However, this should be okay since not enabling
+ // shouldMergeSchemas means (assumes) all the files have the same schemas.
+
val needMerged: Seq[FileStatus] =
if (mergeRespectSummaries) {
Seq()
} else {
dataStatuses
}
- (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
+ needMerged ++ metadataStatuses ++ commonMetadataStatuses
} else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
@@ -768,10 +785,10 @@ private[sql] object ParquetRelation extends Logging {
footers.map { footer =>
ParquetRelation.readSchemaFromFooter(footer, converter)
- }.reduceOption(_ merge _).iterator
+ }.reduceLeftOption(_ merge _).iterator
}.collect()
- partiallyMergedSchemas.reduceOption(_ merge _)
+ partiallyMergedSchemas.reduceLeftOption(_ merge _)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index e296d631f0..5b8841bc15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -428,11 +428,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private var _partitionSpec: PartitionSpec = _
private class FileStatusCache {
- var leafFiles = mutable.Map.empty[Path, FileStatus]
+ var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
- private def listLeafFiles(paths: Array[String]): Set[FileStatus] = {
+ private def listLeafFiles(paths: Array[String]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
} else {
@@ -450,10 +450,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
val (dirs, files) = statuses.partition(_.isDir)
+ // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
if (dirs.isEmpty) {
- files.toSet
+ mutable.LinkedHashSet(files: _*)
} else {
- files.toSet ++ listLeafFiles(dirs.map(_.getPath.toString))
+ mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath.toString))
}
}
}
@@ -464,7 +465,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
leafFiles.clear()
leafDirToChildrenFiles.clear()
- leafFiles ++= files.map(f => f.getPath -> f).toMap
+ leafFiles ++= files.map(f => f.getPath -> f)
leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
}
}
@@ -475,8 +476,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
cache
}
- protected def cachedLeafStatuses(): Set[FileStatus] = {
- fileStatusCache.leafFiles.values.toSet
+ protected def cachedLeafStatuses(): mutable.LinkedHashSet[FileStatus] = {
+ mutable.LinkedHashSet(fileStatusCache.leafFiles.values.toArray: _*)
}
final private[sql] def partitionSpec: PartitionSpec = {
@@ -834,7 +835,7 @@ private[sql] object HadoopFsRelation extends Logging {
def listLeafFilesInParallel(
paths: Array[String],
hadoopConf: Configuration,
- sparkContext: SparkContext): Set[FileStatus] = {
+ sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -854,9 +855,10 @@ private[sql] object HadoopFsRelation extends Logging {
status.getAccessTime)
}.collect()
- fakeStatuses.map { f =>
+ val hadoopFakeStatuses = fakeStatuses.map { f =>
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path))
- }.toSet
+ }
+ mutable.LinkedHashSet(hadoopFakeStatuses: _*)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index e2d754e806..e866493ee6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{execution, AnalysisException, SaveMode}
+import org.apache.spark.sql._
import org.apache.spark.sql.types._
@@ -155,4 +155,23 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
}
}
+
+ test("SPARK-11500: Not deterministic order of columns when using merging schemas.") {
+ import testImplicits._
+ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val pathOne = s"${dir.getCanonicalPath}/part=1"
+ Seq(1, 1).zipWithIndex.toDF("a", "b").write.parquet(pathOne)
+ val pathTwo = s"${dir.getCanonicalPath}/part=2"
+ Seq(1, 1).zipWithIndex.toDF("c", "b").write.parquet(pathTwo)
+ val pathThree = s"${dir.getCanonicalPath}/part=3"
+ Seq(1, 1).zipWithIndex.toDF("d", "b").write.parquet(pathThree)
+
+ // The schema consists of the leading columns of the first part-file
+ // in the lexicographic order.
+ assert(sqlContext.read.parquet(dir.getCanonicalPath).schema.map(_.name)
+ === Seq("a", "b", "c", "d", "part"))
+ }
+ }
+ }
}