aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-12-02 20:59:39 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-02 20:59:39 +0800
commit294163ee9319e4f7f6da1259839eb3c80bba25c2 (patch)
tree5a70f4e59e29788c3ea0dfd0cd8db64713899ddd
parent2159bf8b2c0239c111e2f96a86c155a8f584a3d9 (diff)
downloadspark-294163ee9319e4f7f6da1259839eb3c80bba25c2.tar.gz
spark-294163ee9319e4f7f6da1259839eb3c80bba25c2.tar.bz2
spark-294163ee9319e4f7f6da1259839eb3c80bba25c2.zip
[SPARK-18679][SQL] Fix regression in file listing performance for non-catalog tables
## What changes were proposed in this pull request? In Spark 2.1 ListingFileCatalog was significantly refactored (and renamed to InMemoryFileIndex). This introduced a regression where parallelism could only be introduced at the very top of the tree. However, in many cases (e.g. `spark.read.parquet(topLevelDir)`), the top of the tree is only a single directory. This PR simplifies and fixes the parallel recursive listing code to allow parallelism to be introduced at any level during recursive descent (though note that once we decide to list a sub-tree in parallel, the sub-tree is listed in serial on executors). cc mallman cloud-fan ## How was this patch tested? Checked metrics in unit tests. Author: Eric Liang <ekl@databricks.com> Closes #16112 from ericl/spark-18679.
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala79
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala)53
3 files changed, 106 insertions, 34 deletions
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index b433cd0a89..99ec78633a 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -91,6 +91,12 @@ object HiveCatalogMetrics extends Source {
val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))
/**
+ * Tracks the total number of Spark jobs launched for parallel file listing.
+ */
+ val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter(
+ MetricRegistry.name("parallelListingJobCount"))
+
+ /**
* Resets the values of all metrics to zero. This is useful in tests.
*/
def reset(): Unit = {
@@ -98,6 +104,7 @@ object HiveCatalogMetrics extends Source {
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
+ METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
}
// clients can use these to avoid classloader issues with the codahale classes
@@ -105,4 +112,5 @@ object HiveCatalogMetrics extends Source {
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
+ def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 705a1e3198..825a0f70dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -249,12 +249,9 @@ abstract class PartitioningAwareFileIndex(
pathsToFetch += path
}
}
- val discovered = if (pathsToFetch.length >=
- sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
- } else {
- PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
- }
+ val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
+ val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
+ pathsToFetch, hadoopConf, filter, sparkSession)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -286,31 +283,28 @@ object PartitioningAwareFileIndex extends Logging {
blockLocations: Array[SerializableBlockLocation])
/**
- * List a collection of path recursively.
- */
- private def listLeafFilesInSerial(
- paths: Seq[Path],
- hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass)
- val filter = FileInputFormat.getInputPathFilter(jobConf)
-
- paths.map { path =>
- val fs = path.getFileSystem(hadoopConf)
- (path, listLeafFiles0(fs, path, filter))
- }
- }
-
- /**
- * List a collection of path recursively in parallel (using Spark executors).
- * Each task launched will use [[listLeafFilesInSerial]] to list.
+ * Lists a collection of paths recursively. Picks the listing strategy adaptively depending
+ * on the number of paths to list.
+ *
+ * This may only be called on the driver.
+ *
+ * @return for each input path, the set of discovered files for the path
*/
- private def listLeafFilesInParallel(
+ private def bulkListLeafFiles(
paths: Seq[Path],
hadoopConf: Configuration,
+ filter: PathFilter,
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
- assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+
+ // Short-circuits parallel listing when serial listing is likely to be faster.
+ if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ return paths.map { path =>
+ (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
+ }
+ }
+
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+ HiveCatalogMetrics.incrementParallelListingJobCount(1)
val sparkContext = sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -324,9 +318,11 @@ object PartitioningAwareFileIndex extends Logging {
val statusMap = sparkContext
.parallelize(serializedPaths, numParallelism)
- .mapPartitions { paths =>
+ .mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
- listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+ pathStrings.map(new Path(_)).toSeq.map { path =>
+ (path, listLeafFiles(path, hadoopConf, filter, None))
+ }.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
@@ -374,11 +370,20 @@ object PartitioningAwareFileIndex extends Logging {
}
/**
- * List a single path, provided as a FileStatus, in serial.
+ * Lists a single filesystem path recursively. If a SparkSession object is specified, this
+ * function may launch Spark jobs to parallelize listing.
+ *
+ * If sessionOpt is None, this may be called on executors.
+ *
+ * @return all children of path that match the specified filter.
*/
- private def listLeafFiles0(
- fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+ private def listLeafFiles(
+ path: Path,
+ hadoopConf: Configuration,
+ filter: PathFilter,
+ sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
logTrace(s"Listing $path")
+ val fs = path.getFileSystem(hadoopConf)
val name = path.getName.toLowerCase
if (shouldFilterOut(name)) {
Seq.empty[FileStatus]
@@ -393,9 +398,15 @@ object PartitioningAwareFileIndex extends Logging {
}
val allLeafStatuses = {
- val (dirs, files) = statuses.partition(_.isDirectory)
- val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
- if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+ val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
+ val nestedFiles: Seq[FileStatus] = sessionOpt match {
+ case Some(session) =>
+ bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
+ case _ =>
+ dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
+ }
+ val allFiles = topLevelFiles ++ nestedFiles
+ if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}
allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 56df1face6..b7a472b7f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -25,6 +25,7 @@ import scala.language.reflectiveCalls
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext
@@ -81,6 +82,58 @@ class FileIndexSuite extends SharedSQLContext {
}
}
+ test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
+ for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+ withTempDir { dir =>
+ val topLevelDirs = (1 to scale).map { i =>
+ val tmp = new File(dir, s"foo=$i.txt")
+ tmp.mkdir()
+ new Path(tmp.getCanonicalPath)
+ }
+ HiveCatalogMetrics.reset()
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None)
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+ }
+ }
+ }
+
+ test("PartitioningAwareFileIndex listing parallelized with large child dirs") {
+ for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+ withTempDir { dir =>
+ for (i <- 1 to scale) {
+ new File(dir, s"foo=$i.txt").mkdir()
+ }
+ HiveCatalogMetrics.reset()
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+ }
+ }
+ }
+
+ test("PartitioningAwareFileIndex listing parallelized with large, deeply nested child dirs") {
+ for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
+ withTempDir { dir =>
+ for (i <- 1 to 2) {
+ val subdirA = new File(dir, s"a=$i")
+ subdirA.mkdir()
+ for (j <- 1 to 2) {
+ val subdirB = new File(subdirA, s"b=$j")
+ subdirB.mkdir()
+ for (k <- 1 to scale) {
+ new File(subdirB, s"foo=$k.txt").mkdir()
+ }
+ }
+ }
+ HiveCatalogMetrics.reset()
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+ new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+ assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+ }
+ }
+ }
+
test("PartitioningAwareFileIndex - file filtering") {
assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))