diff options
author | windpiger <songjun@outlook.com> | 2017-03-01 08:16:29 -0800 |
---|---|---|
committer | Xiao Li <gatorsmile@gmail.com> | 2017-03-01 08:16:29 -0800 |
commit | 8aa560b75e6b083b2a890c52301414285ba35c3d (patch) | |
tree | 168ffd04c98c49eeb18d6603d110545d87f1028b /sql/core/src/main/scala | |
parent | 5502a9cf883b2058209904c152e5d2c2a106b072 (diff) | |
download | spark-8aa560b75e6b083b2a890c52301414285ba35c3d.tar.gz spark-8aa560b75e6b083b2a890c52301414285ba35c3d.tar.bz2 spark-8aa560b75e6b083b2a890c52301414285ba35c3d.zip |
[SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed
## What changes were proposed in this pull request?
If we create a InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero, it will throw an exception:
```
Positive number of slices required
java.lang.IllegalArgumentException: Positive number of slices required
at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:357)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:256)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:50)
at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9$$anonfun$apply$mcV$sp$2.apply$mcV$sp(FileIndexSuite.scala:186)
at org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:105)
at org.apache.spark.sql.execution.datasources.FileIndexSuite.withSQLConf(FileIndexSuite.scala:33)
at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply$mcV$sp(FileIndexSuite.scala:185)
at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```
## How was this patch tested?
unit test added
Author: windpiger <songjun@outlook.com>
Closes #17093 from windpiger/fixEmptiPathInBulkListFiles.
Diffstat (limited to 'sql/core/src/main/scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala | 2 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 |
2 files changed, 5 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 75f87a5503..549257c0e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -300,7 +300,7 @@ object PartitioningAwareFileIndex extends Logging { sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { // Short-circuits parallel listing when serial listing is likely to be faster. - if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { return paths.map { path => (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc0f130406..461dfe3a66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -402,11 +402,13 @@ object SQLConf { val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold") - .doc("The maximum number of files allowed for listing files at driver side. If the number " + - "of detected files exceeds this value during partition discovery, it tries to list the " + + .doc("The maximum number of paths allowed for listing files at driver side. If the number " + + "of detected paths exceeds this value during partition discovery, it tries to list the " + "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " + "LibSVM data sources.") .intConf + .checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " + + "files at driver side must not be negative") .createWithDefault(32) val PARALLEL_PARTITION_DISCOVERY_PARALLELISM = |