aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala66
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala217
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala225
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
13 files changed, 304 insertions, 354 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7dccbbd3f0..073d2b1512 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
-import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 623d2be55d..fdd1fa3648 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -431,7 +431,7 @@ case class FileSourceScanExec(
private def createBucketedReadRDD(
bucketSpec: BucketSpec,
readFile: (PartitionedFile) => Iterator[InternalRow],
- selectedPartitions: Seq[Partition],
+ selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val bucketed =
@@ -463,7 +463,7 @@ case class FileSourceScanExec(
*/
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
- selectedPartitions: Seq[Partition],
+ selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
new file mode 100644
index 0000000000..2bc66ceeeb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+
+/**
+ * A collection of data files from a partitioned relation, along with the partition values in the
+ * form of an [[InternalRow]].
+ */
+case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
+
+/**
+ * An interface for objects capable of enumerating the root paths of a relation as well as the
+ * partitions of a relation subject to some pruning expressions.
+ */
+trait FileCatalog {
+
+ /**
+ * Returns the list of root input paths from which the catalog will get files. There may be a
+ * single root path from which partitions are discovered, or individual partitions may be
+ * specified by each path.
+ */
+ def rootPaths: Seq[Path]
+
+ /**
+ * Returns all valid files grouped into partitions when the data is partitioned. If the data is
+ * unpartitioned, this will return a single partition with no partition values.
+ *
+ * @param filters The filters used to prune which partitions are returned. These filters must
+ * only refer to partition columns and this method will only return files
+ * where these predicates are guaranteed to evaluate to `true`. Thus, these
+ * filters will not need to be evaluated again on the returned data.
+ */
+ def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
+
+ /**
+ * Returns the list of files that will be read when scanning this relation. This call may be
+ * very expensive for large tables.
+ */
+ def inputFiles: Array[String]
+
+ /** Refresh any cached file listings */
+ def refresh(): Unit
+
+ /** Sum of table file sizes, in bytes */
+ def sizeInBytes: Long
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index e7239ef91b..9d153cec73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -175,64 +175,3 @@ abstract class TextBasedFileFormat extends FileFormat {
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
}
}
-
-/**
- * A collection of data files from a partitioned relation, along with the partition values in the
- * form of an [[InternalRow]].
- */
-case class Partition(values: InternalRow, files: Seq[FileStatus])
-
-/**
- * An interface for objects capable of enumerating the root paths of a relation as well as the
- * partitions of a relation subject to some pruning expressions.
- */
-trait BasicFileCatalog {
-
- /**
- * Returns the list of root input paths from which the catalog will get files. There may be a
- * single root path from which partitions are discovered, or individual partitions may be
- * specified by each path.
- */
- def rootPaths: Seq[Path]
-
- /**
- * Returns all valid files grouped into partitions when the data is partitioned. If the data is
- * unpartitioned, this will return a single partition with no partition values.
- *
- * @param filters The filters used to prune which partitions are returned. These filters must
- * only refer to partition columns and this method will only return files
- * where these predicates are guaranteed to evaluate to `true`. Thus, these
- * filters will not need to be evaluated again on the returned data.
- */
- def listFiles(filters: Seq[Expression]): Seq[Partition]
-
- /** Returns the list of files that will be read when scanning this relation. */
- def inputFiles: Array[String]
-
- /** Refresh any cached file listings */
- def refresh(): Unit
-
- /** Sum of table file sizes, in bytes */
- def sizeInBytes: Long
-}
-
-/**
- * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from
- * those, infer the relation's partition specification.
- */
-// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for
-// which it is safe to list all of its files?
-trait FileCatalog extends BasicFileCatalog {
-
- /** Returns the specification of the partitions inferred from the data. */
- def partitionSpec(): PartitionSpec
-
- /** Returns all the valid files. */
- def allFiles(): Seq[FileStatus]
-
- /** Returns the list of files that will be read when scanning this relation. */
- override def inputFiles: Array[String] =
- allFiles().map(_.getPath.toUri.toString).toArray
-
- override def sizeInBytes: Long = allFiles().map(_.getLen).sum
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index db889edf03..afad889808 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
- * @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that
+ * @param location A [[FileCatalog]] that can enumerate the locations of all the files that
* comprise this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
- location: BasicFileCatalog,
+ location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index b2508115c2..5c8eff7ec4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -17,14 +17,21 @@
package org.apache.spark.sql.execution.datasources
+import java.io.FileNotFoundException
+
import scala.collection.mutable
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.util.SerializableConfiguration
/**
@@ -38,22 +45,24 @@ import org.apache.spark.sql.types.{StringType, StructType}
abstract class PartitioningAwareFileCatalog(
sparkSession: SparkSession,
parameters: Map[String, String],
- partitionSchema: Option[StructType])
- extends SessionFileCatalog(sparkSession) with FileCatalog {
+ partitionSchema: Option[StructType]) extends FileCatalog with Logging {
import PartitioningAwareFileCatalog.BASE_PATH_PARAM
- override protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+ /** Returns the specification of the partitions inferred from the data. */
+ def partitionSpec(): PartitionSpec
+
+ protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
- override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
+ override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
- Partition(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
+ PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
} else {
prunePartitions(filters, partitionSpec()).map {
- case PartitionDirectory(values, path) =>
+ case PartitionPath(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
@@ -63,14 +72,20 @@ abstract class PartitioningAwareFileCatalog(
// Directory does not exist, or has no children files
Nil
}
- Partition(values, files)
+ PartitionDirectory(values, files)
}
}
logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
selectedPartitions
}
- override def allFiles(): Seq[FileStatus] = {
+ /** Returns the list of files that will be read when scanning this relation. */
+ override def inputFiles: Array[String] =
+ allFiles().map(_.getPath.toUri.toString).toArray
+
+ override def sizeInBytes: Long = allFiles().map(_.getLen).sum
+
+ def allFiles(): Seq[FileStatus] = {
if (partitionSpec().partitionColumns.isEmpty) {
// For each of the root input paths, get the list of files inside them
rootPaths.flatMap { path =>
@@ -139,7 +154,7 @@ abstract class PartitioningAwareFileCatalog(
private def prunePartitions(
predicates: Seq[Expression],
- partitionSpec: PartitionSpec): Seq[PartitionDirectory] = {
+ partitionSpec: PartitionSpec): Seq[PartitionPath] = {
val PartitionSpec(partitionColumns, partitions) = partitionSpec
val partitionColumnNames = partitionColumns.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
@@ -156,7 +171,7 @@ abstract class PartitioningAwareFileCatalog(
})
val selected = partitions.filter {
- case PartitionDirectory(values, _) => boundPredicate(values)
+ case PartitionPath(values, _) => boundPredicate(values)
}
logInfo {
val total = partitions.length
@@ -214,8 +229,186 @@ abstract class PartitioningAwareFileCatalog(
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
+
+ /**
+ * List leaf files of given paths. This method will submit a Spark job to do parallel
+ * listing whenever there is a path having more files than the parallel partition discovery
+ * discovery threshold.
+ *
+ * This is publicly visible for testing.
+ */
+ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+ val files =
+ if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ PartitioningAwareFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
+ } else {
+ PartitioningAwareFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
+ }
+
+ HiveCatalogMetrics.incrementFilesDiscovered(files.size)
+ mutable.LinkedHashSet(files: _*)
+ }
}
-object PartitioningAwareFileCatalog {
+object PartitioningAwareFileCatalog extends Logging {
val BASE_PATH_PARAM = "basePath"
+
+ /** A serializable variant of HDFS's BlockLocation. */
+ private case class SerializableBlockLocation(
+ names: Array[String],
+ hosts: Array[String],
+ offset: Long,
+ length: Long)
+
+ /** A serializable variant of HDFS's FileStatus. */
+ private case class SerializableFileStatus(
+ path: String,
+ length: Long,
+ isDir: Boolean,
+ blockReplication: Short,
+ blockSize: Long,
+ modificationTime: Long,
+ accessTime: Long,
+ blockLocations: Array[SerializableBlockLocation])
+
+ /**
+ * List a collection of path recursively.
+ */
+ private def listLeafFilesInSerial(
+ paths: Seq[Path],
+ hadoopConf: Configuration): Seq[FileStatus] = {
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ val jobConf = new JobConf(hadoopConf, this.getClass)
+ val filter = FileInputFormat.getInputPathFilter(jobConf)
+
+ paths.flatMap { path =>
+ val fs = path.getFileSystem(hadoopConf)
+ listLeafFiles0(fs, path, filter)
+ }
+ }
+
+ /**
+ * List a collection of path recursively in parallel (using Spark executors).
+ * Each task launched will use [[listLeafFilesInSerial]] to list.
+ */
+ private def listLeafFilesInParallel(
+ paths: Seq[Path],
+ hadoopConf: Configuration,
+ sparkSession: SparkSession): Seq[FileStatus] = {
+ assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+ logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+
+ val sparkContext = sparkSession.sparkContext
+ val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+ val serializedPaths = paths.map(_.toString)
+
+ // Set the number of parallelism to prevent following file listing from generating many tasks
+ // in case of large #defaultParallelism.
+ val numParallelism = Math.min(paths.size, 10000)
+
+ val statuses = sparkContext
+ .parallelize(serializedPaths, numParallelism)
+ .mapPartitions { paths =>
+ val hadoopConf = serializableConfiguration.value
+ listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+ }.map { status =>
+ // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+ val blockLocations = status match {
+ case f: LocatedFileStatus =>
+ f.getBlockLocations.map { loc =>
+ SerializableBlockLocation(
+ loc.getNames,
+ loc.getHosts,
+ loc.getOffset,
+ loc.getLength)
+ }
+
+ case _ =>
+ Array.empty[SerializableBlockLocation]
+ }
+
+ SerializableFileStatus(
+ status.getPath.toString,
+ status.getLen,
+ status.isDirectory,
+ status.getReplication,
+ status.getBlockSize,
+ status.getModificationTime,
+ status.getAccessTime,
+ blockLocations)
+ }.collect()
+
+ // Turn SerializableFileStatus back to Status
+ statuses.map { f =>
+ val blockLocations = f.blockLocations.map { loc =>
+ new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+ }
+ new LocatedFileStatus(
+ new FileStatus(
+ f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
+ blockLocations)
+ }
+ }
+
+ /**
+ * List a single path, provided as a FileStatus, in serial.
+ */
+ private def listLeafFiles0(
+ fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+ logTrace(s"Listing $path")
+ val name = path.getName.toLowerCase
+ if (shouldFilterOut(name)) {
+ Seq.empty[FileStatus]
+ } else {
+ // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
+ // Note that statuses only include FileStatus for the files and dirs directly under path,
+ // and does not include anything else recursively.
+ val statuses = try fs.listStatus(path) catch {
+ case _: FileNotFoundException =>
+ logWarning(s"The directory $path was not found. Was it deleted very recently?")
+ Array.empty[FileStatus]
+ }
+
+ val allLeafStatuses = {
+ val (dirs, files) = statuses.partition(_.isDirectory)
+ val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
+ if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+ }
+
+ allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+ case f: LocatedFileStatus =>
+ f
+
+ // NOTE:
+ //
+ // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+ // operations, calling `getFileBlockLocations` does no harm here since these file system
+ // implementations don't actually issue RPC for this method.
+ //
+ // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+ // be a big deal since we always use to `listLeafFilesInParallel` when the number of
+ // paths exceeds threshold.
+ case f =>
+ // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
+ // which is very slow on some file system (RawLocalFileSystem, which is launch a
+ // subprocess and parse the stdout).
+ val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+ val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+ f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+ if (f.isSymlink) {
+ lfs.setSymlink(f.getSymlink)
+ }
+ lfs
+ }
+ }
+ }
+
+ /** Checks if we should filter out this path name. */
+ def shouldFilterOut(pathName: String): Boolean = {
+ // We filter everything that starts with _ and ., except _common_metadata and _metadata
+ // because Parquet needs to find those metadata files from leaf files returned by this method.
+ // We should refactor this logic to not mix metadata files with data files.
+ ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
+ !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 381261cf65..81bdabb7af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -34,8 +34,8 @@ import org.apache.spark.sql.types._
// TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.
-object PartitionDirectory {
- def apply(values: InternalRow, path: String): PartitionDirectory =
+object PartitionPath {
+ def apply(values: InternalRow, path: String): PartitionPath =
apply(values, new Path(path))
}
@@ -43,14 +43,14 @@ object PartitionDirectory {
* Holds a directory in a partitioned collection of files as well as as the partition values
* in the form of a Row. Before scanning, the files at `path` need to be enumerated.
*/
-case class PartitionDirectory(values: InternalRow, path: Path)
+case class PartitionPath(values: InternalRow, path: Path)
case class PartitionSpec(
partitionColumns: StructType,
- partitions: Seq[PartitionDirectory])
+ partitions: Seq[PartitionPath])
object PartitionSpec {
- val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionDirectory])
+ val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionPath])
}
object PartitioningUtils {
@@ -142,7 +142,7 @@ object PartitioningUtils {
// Finally, we create `Partition`s based on paths and resolved partition values.
val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map {
case (PartitionValues(_, literals), (path, _)) =>
- PartitionDirectory(InternalRow.fromSeq(literals.map(_.value)), path)
+ PartitionPath(InternalRow.fromSeq(literals.map(_.value)), path)
}
PartitionSpec(StructType(fields), partitions)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
deleted file mode 100644
index 4807a92c2e..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.io.FileNotFoundException
-
-import scala.collection.mutable
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.SerializableConfiguration
-
-
-/**
- * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and the ability to find leaf
- * files in a list of HDFS paths.
- *
- * @param sparkSession a [[SparkSession]]
- * @param ignoreFileNotFound (see [[ListingFileCatalog]])
- */
-abstract class SessionFileCatalog(sparkSession: SparkSession)
- extends BasicFileCatalog with Logging {
- protected val hadoopConf: Configuration
-
- /**
- * List leaf files of given paths. This method will submit a Spark job to do parallel
- * listing whenever there is a path having more files than the parallel partition discovery
- * discovery threshold.
- *
- * This is publicly visible for testing.
- */
- def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
- val files =
- if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- SessionFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
- } else {
- SessionFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
- }
-
- HiveCatalogMetrics.incrementFilesDiscovered(files.size)
- mutable.LinkedHashSet(files: _*)
- }
-}
-
-object SessionFileCatalog extends Logging {
-
- /** A serializable variant of HDFS's BlockLocation. */
- private case class SerializableBlockLocation(
- names: Array[String],
- hosts: Array[String],
- offset: Long,
- length: Long)
-
- /** A serializable variant of HDFS's FileStatus. */
- private case class SerializableFileStatus(
- path: String,
- length: Long,
- isDir: Boolean,
- blockReplication: Short,
- blockSize: Long,
- modificationTime: Long,
- accessTime: Long,
- blockLocations: Array[SerializableBlockLocation])
-
- /**
- * List a collection of path recursively.
- */
- private def listLeafFilesInSerial(
- paths: Seq[Path],
- hadoopConf: Configuration): Seq[FileStatus] = {
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass)
- val filter = FileInputFormat.getInputPathFilter(jobConf)
-
- paths.flatMap { path =>
- val fs = path.getFileSystem(hadoopConf)
- listLeafFiles0(fs, path, filter)
- }
- }
-
- /**
- * List a collection of path recursively in parallel (using Spark executors).
- * Each task launched will use [[listLeafFilesInSerial]] to list.
- */
- private def listLeafFilesInParallel(
- paths: Seq[Path],
- hadoopConf: Configuration,
- sparkSession: SparkSession): Seq[FileStatus] = {
- assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
- logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
-
- val sparkContext = sparkSession.sparkContext
- val serializableConfiguration = new SerializableConfiguration(hadoopConf)
- val serializedPaths = paths.map(_.toString)
-
- // Set the number of parallelism to prevent following file listing from generating many tasks
- // in case of large #defaultParallelism.
- val numParallelism = Math.min(paths.size, 10000)
-
- val statuses = sparkContext
- .parallelize(serializedPaths, numParallelism)
- .mapPartitions { paths =>
- val hadoopConf = serializableConfiguration.value
- listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
- }.map { status =>
- // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
- val blockLocations = status match {
- case f: LocatedFileStatus =>
- f.getBlockLocations.map { loc =>
- SerializableBlockLocation(
- loc.getNames,
- loc.getHosts,
- loc.getOffset,
- loc.getLength)
- }
-
- case _ =>
- Array.empty[SerializableBlockLocation]
- }
-
- SerializableFileStatus(
- status.getPath.toString,
- status.getLen,
- status.isDirectory,
- status.getReplication,
- status.getBlockSize,
- status.getModificationTime,
- status.getAccessTime,
- blockLocations)
- }.collect()
-
- // Turn SerializableFileStatus back to Status
- statuses.map { f =>
- val blockLocations = f.blockLocations.map { loc =>
- new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
- }
- new LocatedFileStatus(
- new FileStatus(
- f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
- blockLocations)
- }
- }
-
- /**
- * List a single path, provided as a FileStatus, in serial.
- */
- private def listLeafFiles0(
- fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
- logTrace(s"Listing $path")
- val name = path.getName.toLowerCase
- if (shouldFilterOut(name)) {
- Seq.empty[FileStatus]
- } else {
- // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
- // Note that statuses only include FileStatus for the files and dirs directly under path,
- // and does not include anything else recursively.
- val statuses = try fs.listStatus(path) catch {
- case _: FileNotFoundException =>
- logWarning(s"The directory $path was not found. Was it deleted very recently?")
- Array.empty[FileStatus]
- }
-
- val allLeafStatuses = {
- val (dirs, files) = statuses.partition(_.isDirectory)
- val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
- if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
- }
-
- allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
- case f: LocatedFileStatus =>
- f
-
- // NOTE:
- //
- // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
- // operations, calling `getFileBlockLocations` does no harm here since these file system
- // implementations don't actually issue RPC for this method.
- //
- // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
- // be a big deal since we always use to `listLeafFilesInParallel` when the number of
- // paths exceeds threshold.
- case f =>
- // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
- // which is very slow on some file system (RawLocalFileSystem, which is launch a
- // subprocess and parse the stdout).
- val locations = fs.getFileBlockLocations(f, 0, f.getLen)
- val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
- f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
- if (f.isSymlink) {
- lfs.setSymlink(f.getSymlink)
- }
- lfs
- }
- }
- }
-
- /** Checks if we should filter out this path name. */
- def shouldFilterOut(pathName: String): Boolean = {
- // We filter everything that starts with _ and ., except _common_metadata and _metadata
- // because Parquet needs to find those metadata files from leaf files returned by this method.
- // We should refactor this logic to not mix metadata files with data files.
- ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
- !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index a5c41b2445..5648ab480a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types.StructType
/**
- * A [[BasicFileCatalog]] for a metastore catalog table.
+ * A [[FileCatalog]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
* @param db the table's database name
@@ -38,10 +38,9 @@ class TableFileCatalog(
db: String,
table: String,
partitionSchema: Option[StructType],
- override val sizeInBytes: Long)
- extends SessionFileCatalog(sparkSession) {
+ override val sizeInBytes: Long) extends FileCatalog {
- override protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+ protected val hadoopConf = sparkSession.sessionState.newHadoopConf
private val externalCatalog = sparkSession.sharedState.externalCatalog
@@ -51,7 +50,7 @@ class TableFileCatalog(
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
- override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
+ override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(filters).listFiles(Nil)
}
@@ -79,7 +78,7 @@ class TableFileCatalog(
case Some(schema) =>
val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters)
val partitions = selectedPartitions.map { p =>
- PartitionDirectory(p.toRow(schema), p.storage.locationUri.get)
+ PartitionPath(p.toRow(schema), p.storage.locationUri.get)
}
val partitionSpec = PartitionSpec(schema, partitions)
new PrunedTableFileCatalog(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 2695974b84..9c43169cbf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -81,6 +81,16 @@ class FileCatalogSuite extends SharedSQLContext {
}
}
+ test("PartitioningAwareFileCatalog - file filtering") {
+ assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd"))
+ assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab"))
+ assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd"))
+ assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata"))
+ assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata"))
+ assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata"))
+ assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata"))
+ }
+
test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
class MockCatalog(
override val rootPaths: Seq[Path])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
deleted file mode 100644
index df50958337..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.spark.SparkFunSuite
-
-class SessionFileCatalogSuite extends SparkFunSuite {
-
- test("file filtering") {
- assert(!SessionFileCatalog.shouldFilterOut("abcd"))
- assert(SessionFileCatalog.shouldFilterOut(".ab"))
- assert(SessionFileCatalog.shouldFilterOut("_cd"))
-
- assert(!SessionFileCatalog.shouldFilterOut("_metadata"))
- assert(!SessionFileCatalog.shouldFilterOut("_common_metadata"))
- assert(SessionFileCatalog.shouldFilterOut("_ab_metadata"))
- assert(SessionFileCatalog.shouldFilterOut("_cd_common_metadata"))
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 2ef66baee1..f2a209e919 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -30,7 +30,8 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -632,10 +633,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
- case LogicalRelation(HadoopFsRelation(location: FileCatalog, _, _, _, _, _), _, _) =>
- assert(location.partitionSpec === PartitionSpec.emptySpec)
+ case LogicalRelation(
+ HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) =>
+ assert(location.partitionSpec() === PartitionSpec.emptySpec)
}.getOrElse {
- fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
+ fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution")
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4a2aaa7d4f..16e1e37b2f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{Partition => _, _}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.types._