diff options
author | Takeshi YAMAMURO <linguin.m.s@gmail.com> | 2016-06-14 13:05:56 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-06-14 13:05:56 -0700 |
commit | dae4d5db21368faaa46fa8d1a256c27428694c2c (patch) | |
tree | 9365fee0215cfea7b5926e1ac05c706746ed3757 /sql | |
parent | bd39ffe35c6f939debe5d3c5eb4970b4e62507b0 (diff) | |
download | spark-dae4d5db21368faaa46fa8d1a256c27428694c2c.tar.gz spark-dae4d5db21368faaa46fa8d1a256c27428694c2c.tar.bz2 spark-dae4d5db21368faaa46fa8d1a256c27428694c2c.zip |
[SPARK-15247][SQL] Set the default number of partitions for reading parquet schemas
## What changes were proposed in this pull request?
This pr sets the default number of partitions when reading parquet schemas.
SQLContext#read#parquet currently yields at least n_executors * n_cores tasks even if parquet data consist of a single small file. This issue could increase the latency for small jobs.
## How was this patch tested?
Manually tested and checked.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes #13137 from maropu/SPARK-15247.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 71c16008be..6b25e36f7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -794,11 +794,16 @@ private[sql] object ParquetFileFormat extends Logging { // side, and resemble fake `FileStatus`es there. val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen)) + // Set the number of partitions to prevent following schema reads from generating many tasks + // in case of a small number of parquet files. + val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), + sparkSession.sparkContext.defaultParallelism) + // Issues a Spark job to read Parquet schema in parallel. val partiallyMergedSchemas = sparkSession .sparkContext - .parallelize(partialFileStatusInfo) + .parallelize(partialFileStatusInfo, numParallelism) .mapPartitions { iterator => // Resembles fake `FileStatus`es with serialized path and length information. val fakeFileStatuses = iterator.map { case (path, length) => |