aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2016-06-14 13:05:56 -0700
committerYin Huai <yhuai@databricks.com>2016-06-14 13:05:56 -0700
commitdae4d5db21368faaa46fa8d1a256c27428694c2c (patch)
tree9365fee0215cfea7b5926e1ac05c706746ed3757
parentbd39ffe35c6f939debe5d3c5eb4970b4e62507b0 (diff)
downloadspark-dae4d5db21368faaa46fa8d1a256c27428694c2c.tar.gz
spark-dae4d5db21368faaa46fa8d1a256c27428694c2c.tar.bz2
spark-dae4d5db21368faaa46fa8d1a256c27428694c2c.zip
[SPARK-15247][SQL] Set the default number of partitions for reading parquet schemas
## What changes were proposed in this pull request? This pr sets the default number of partitions when reading parquet schemas. SQLContext#read#parquet currently yields at least n_executors * n_cores tasks even if parquet data consist of a single small file. This issue could increase the latency for small jobs. ## How was this patch tested? Manually tested and checked. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #13137 from maropu/SPARK-15247.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 71c16008be..6b25e36f7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -794,11 +794,16 @@ private[sql] object ParquetFileFormat extends Logging {
// side, and resemble fake `FileStatus`es there.
val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
+ // Set the number of partitions to prevent following schema reads from generating many tasks
+ // in case of a small number of parquet files.
+ val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
+ sparkSession.sparkContext.defaultParallelism)
+
// Issues a Spark job to read Parquet schema in parallel.
val partiallyMergedSchemas =
sparkSession
.sparkContext
- .parallelize(partialFileStatusInfo)
+ .parallelize(partialFileStatusInfo, numParallelism)
.mapPartitions { iterator =>
// Resembles fake `FileStatus`es with serialized path and length information.
val fakeFileStatuses = iterator.map { case (path, length) =>