aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala31
1 files changed, 31 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index ffd7f6c750..f8d456dd6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -236,6 +236,37 @@ abstract class PartitioningAwareFileIndex(
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
+
+ /**
+ * List leaf files of given paths. This method will submit a Spark job to do parallel
+ * listing whenever there is a path having more files than the parallel partition discovery
+ * discovery threshold.
+ *
+ * This is publicly visible for testing.
+ */
+ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+ val output = mutable.LinkedHashSet[FileStatus]()
+ val pathsToFetch = mutable.ArrayBuffer[Path]()
+ for (path <- paths) {
+ fileStatusCache.getLeafFiles(path) match {
+ case Some(files) =>
+ HiveCatalogMetrics.incrementFileCacheHits(files.length)
+ output ++= files
+ case None =>
+ pathsToFetch += path
+ }
+ ()
+ }
+ val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
+ val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
+ pathsToFetch, hadoopConf, filter, sparkSession)
+ discovered.foreach { case (path, leafFiles) =>
+ HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
+ fileStatusCache.putLeafFiles(path, leafFiles.toArray)
+ output ++= leafFiles
+ }
+ output
+ }
}
object PartitioningAwareFileIndex {