aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2016-06-13 13:41:26 -0700
committerYin Huai <yhuai@databricks.com>2016-06-13 13:41:26 -0700
commit5ad4e32d46599ae1b8626f08aa97345d078c28d7 (patch)
tree604dd944bf2ed054dfef6f4fc7a727d95f75870b
parent3b7fb84cf88bcae56713fd56396db537fa18f2e5 (diff)
downloadspark-5ad4e32d46599ae1b8626f08aa97345d078c28d7.tar.gz
spark-5ad4e32d46599ae1b8626f08aa97345d078c28d7.tar.bz2
spark-5ad4e32d46599ae1b8626f08aa97345d078c28d7.zip
[SPARK-15530][SQL] Set #parallelism for file listing in listLeafFilesInParallel
## What changes were proposed in this pull request? This pr is to set the number of parallelism to prevent file listing in `listLeafFilesInParallel` from generating many tasks in case of large #defaultParallelism. ## How was this patch tested? Manually checked Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #13444 from maropu/SPARK-15530.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala13
2 files changed, 12 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index dd3c96a792..7d2854aaad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -75,7 +75,7 @@ class ListingFileCatalog(
protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext)
+ HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
} else {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 890e64db59..9c1898994c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -448,13 +448,22 @@ private[sql] object HadoopFsRelation extends Logging {
def listLeafFilesInParallel(
paths: Seq[Path],
hadoopConf: Configuration,
- sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
+ sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
+ assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+ val sparkContext = sparkSession.sparkContext
+ val sqlConf = sparkSession.sessionState.conf
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = paths.map(_.toString)
- val fakeStatuses = sparkContext.parallelize(serializedPaths).mapPartitions { paths =>
+ // Set the number of parallelism to prevent following file listing from generating many tasks
+ // in case of large #defaultParallelism.
+ val numParallelism = Math.min(paths.size, 10000)
+
+ val fakeStatuses = sparkContext
+ .parallelize(serializedPaths, numParallelism)
+ .mapPartitions { paths =>
// Dummy jobconf to get to the pathFilter defined in configuration
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
val jobConf = new JobConf(serializableConfiguration.value, this.getClass)