aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala79
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala)53
3 files changed, 106 insertions, 34 deletions
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index b433cd0a89..99ec78633a 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -91,6 +91,12 @@ object HiveCatalogMetrics extends Source {
val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))
/**
+ * Tracks the total number of Spark jobs launched for parallel file listing.
+ */
+ val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter(
+ MetricRegistry.name("parallelListingJobCount"))
+
+ /**
* Resets the values of all metrics to zero. This is useful in tests.
*/
def reset(): Unit = {
@@ -98,6 +104,7 @@ object HiveCatalogMetrics extends Source {
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
+ METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
}
// clients can use these to avoid classloader issues with the codahale classes
@@ -105,4 +112,5 @@ object HiveCatalogMetrics extends Source {
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
+ def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 705a1e3198..825a0f70dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -249,12 +249,9 @@ abstract class PartitioningAwareFileIndex(
pathsToFetch += path
}
}
- val discovered = if (pathsToFetch.length >=
- sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
- } else {
- PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
- }
+ val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
+ val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
+ pathsToFetch, hadoopConf, filter, sparkSession)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -286,31 +283,28 @@ object PartitioningAwareFileIndex extends Logging {
blockLocations: Array[SerializableBlockLocation])
/**
- * List a collection of path recursively.
- */
- private def listLeafFilesInSerial(
- paths: Seq[Path],
- hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass)
- val filter = FileInputFormat.getInputPathFilter(jobConf)
-
- paths.map { path =>
- val fs = path.getFileSystem(hadoopConf)
- (path, listLeafFiles0(fs, path, filter))
- }
- }
-
- /**
- * List a collection of path recursively in parallel (using Spark executors).
- * Each task launched will use [[listLeafFilesInSerial]] to list.
+ * Lists a collection of paths recursively. Picks the listing strategy adaptively depending
+ * on the number of paths to list.
+ *
+ * This may only be called on the driver.
+ *
+ * @return for each input path, the set of discovered files for the path
*/
- private def listLeafFilesInParallel(
+ private def bulkListLeafFiles(
paths: Seq[Path],
hadoopConf: Configuration,
+ filter: PathFilter,
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
- assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+
+ // Short-circuits parallel listing when serial listing is likely to be faster.
+ if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ return paths.map { path =>
+ (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
+ }
+ }
+
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+ HiveCatalogMetrics.incrementParallelListingJobCount(1)
val sparkContext = sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -324,9 +318,11 @@ object PartitioningAwareFileIndex extends Logging {
val statusMap = sparkContext
.parallelize(serializedPaths, numParallelism)
- .mapPartitions { paths =>
+ .mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
- listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+ pathStrings.map(new Path(_)).toSeq.map { path =>
+ (path, listLeafFiles(path, hadoopConf, filter, None))
+ }.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
@@ -374,11 +370,20 @@ object PartitioningAwareFileIndex extends Logging {
}
/**
- * List a single path, provided as a FileStatus, in serial.
+ * Lists a single filesystem path recursively. If a SparkSession object is specified, this
+ * function may launch Spark jobs to parallelize listing.
+ *
+ * If sessionOpt is None, this may be called on executors.
+ *
+ * @return all children of path that match the specified filter.
*/
- private def listLeafFiles0(
- fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+ private def listLeafFiles(
+ path: Path,
+ hadoopConf: Configuration,
+ filter: PathFilter,
+ sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
logTrace(s"Listing $path")
+ val fs = path.getFileSystem(hadoopConf)
val name = path.getName.toLowerCase
if (shouldFilterOut(name)) {
Seq.empty[FileStatus]
@@ -393,9 +398,15 @@ object PartitioningAwareFileIndex extends Logging {
}
val allLeafStatuses = {
- val (dirs, files) = statuses.partition(_.isDirectory)
- val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
- if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+ val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
+ val nestedFiles: Seq[FileStatus] = sessionOpt match {
+ case Some(session) =>
+ bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
+ case _ =>
+ dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
+ }
+ val allFiles = topLevelFiles ++ nestedFiles
+ if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}
allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 56df1face6..b7a472b7f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -25,6 +25,7 @@ import scala.language.reflectiveCalls
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext
@@ -81,6 +82,58 @@ class FileIndexSuite extends SharedSQLContext {
}
}
+ test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
+ for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+ withTempDir { dir =>
+ val topLevelDirs = (1 to scale).map { i =>
+ val tmp = new File(dir, s"foo=$i.txt")
+ tmp.mkdir()
+ new Path(tmp.getCanonicalPath)
+ }
+ HiveCatalogMetrics.reset()
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None)
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+ }
+ }
+ }
+
+ test("PartitioningAwareFileIndex listing parallelized with large child dirs") {
+ for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+ withTempDir { dir =>
+ for (i <- 1 to scale) {
+ new File(dir, s"foo=$i.txt").mkdir()
+ }
+ HiveCatalogMetrics.reset()
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+ }
+ }
+ }
+
+ test("PartitioningAwareFileIndex listing parallelized with large, deeply nested child dirs") {
+ for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
+ withTempDir { dir =>
+ for (i <- 1 to 2) {
+ val subdirA = new File(dir, s"a=$i")
+ subdirA.mkdir()
+ for (j <- 1 to 2) {
+ val subdirB = new File(subdirA, s"b=$j")
+ subdirB.mkdir()
+ for (k <- 1 to scale) {
+ new File(subdirB, s"foo=$k.txt").mkdir()
+ }
+ }
+ }
+ HiveCatalogMetrics.reset()
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+ }
+ }
+ }
+
test("PartitioningAwareFileIndex - file filtering") {
assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))