aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala16
3 files changed, 21 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 75f87a5503..549257c0e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -300,7 +300,7 @@ object PartitioningAwareFileIndex extends Logging {
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
// Short-circuits parallel listing when serial listing is likely to be faster.
- if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
return paths.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index dc0f130406..461dfe3a66 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -402,11 +402,13 @@ object SQLConf {
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold")
- .doc("The maximum number of files allowed for listing files at driver side. If the number " +
- "of detected files exceeds this value during partition discovery, it tries to list the " +
+ .doc("The maximum number of paths allowed for listing files at driver side. If the number " +
+ "of detected paths exceeds this value during partition discovery, it tries to list the " +
"files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
"LibSVM data sources.")
.intConf
+ .checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +
+ "files at driver side must not be negative")
.createWithDefault(32)
val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index efbfc2417d..7ea4064927 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
class FileIndexSuite extends SharedSQLContext {
@@ -179,6 +180,21 @@ class FileIndexSuite extends SharedSQLContext {
}
}
+ test("InMemoryFileIndex with empty rootPaths when PARALLEL_PARTITION_DISCOVERY_THRESHOLD" +
+ "is a nonpositive number") {
+ withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
+ new InMemoryFileIndex(spark, Seq.empty, Map.empty, None)
+ }
+
+ val e = intercept[IllegalArgumentException] {
+ withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "-1") {
+ new InMemoryFileIndex(spark, Seq.empty, Map.empty, None)
+ }
+ }.getMessage
+ assert(e.contains("The maximum number of paths allowed for listing files at " +
+ "driver side must not be negative"))
+ }
+
test("refresh for InMemoryFileIndex with FileStatusCache") {
withTempDir { dir =>
val fileStatusCache = FileStatusCache.getOrCreate(spark)