aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-07-20 16:42:43 -0700
committerMichael Armbrust <michael@databricks.com>2015-07-20 16:42:43 -0700
commita1064df0ee3daf496800be84293345a10e1497d9 (patch)
tree22157890103df1ad143d2964828306bcad0c09ff /core
parentdac7dbf5a6bd663ef3acaa5b2249b31140fa6857 (diff)
downloadspark-a1064df0ee3daf496800be84293345a10e1497d9.tar.gz
spark-a1064df0ee3daf496800be84293345a10e1497d9.tar.bz2
spark-a1064df0ee3daf496800be84293345a10e1497d9.zip
[SPARK-8125] [SQL] Accelerates Parquet schema merging and partition discovery
This PR tries to accelerate Parquet schema discovery and `HadoopFsRelation` partition discovery. The acceleration is done by the following means: - Turning off schema merging by default Schema merging is not the most common case, but requires reading footers of all Parquet part-files and can be very slow. - Avoiding `FileSystem.globStatus()` call when possible `FileSystem.globStatus()` may issue multiple synchronous RPC calls, and can be very slow (esp. on S3). This PR adds `SparkHadoopUtil.globPathIfNecessary()`, which only issues RPC calls when the path contain glob-pattern specific character(s) (`{}[]*?\`). This is especially useful when converting a metastore Parquet table with lots of partitions, since Spark SQL adds all partition directories as the input paths, and currently we do a `globStatus` call on each input path sequentially. - Listing leaf files in parallel when the number of input paths exceeds a threshold Listing leaf files is required by partition discovery. Currently it is done on driver side, and can be slow when there are lots of (nested) directories, since each `FileSystem.listStatus()` call issues an RPC. In this PR, we list leaf files in a BFS style, and resort to a Spark job once we found that the number of directories need to be listed exceed a threshold. The threshold is controlled by `SQLConf` option `spark.sql.sources.parallelPartitionDiscovery.threshold`, which defaults to 32. - Discovering Parquet schema in parallel Currently, schema merging is also done on driver side, and needs to read footers of all part-files. This PR uses a Spark job to do schema merging. Together with task side metadata reading in Parquet 1.7.0, we never read any footers on driver side now. Author: Cheng Lian <lian@databricks.com> Closes #7396 from liancheng/accel-parquet and squashes the following commits: 5598efc [Cheng Lian] Uses ParquetInputFormat[InternalRow] instead of ParquetInputFormat[Row] ff32cd0 [Cheng Lian] Excludes directories while listing leaf files 3c580f1 [Cheng Lian] Fixes test failure caused by making "mergeSchema" default to "false" b1646aa [Cheng Lian] Should allow empty input paths 32e5f0d [Cheng Lian] Moves schema merging to executor side
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala8
1 files changed, 8 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 6b14d407a6..e06b06e06f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -239,6 +239,14 @@ class SparkHadoopUtil extends Logging {
}.getOrElse(Seq.empty[Path])
}
+ def globPathIfNecessary(pattern: Path): Seq[Path] = {
+ if (pattern.toString.exists("{}[]*?\\".toSet.contains)) {
+ globPath(pattern)
+ } else {
+ Seq(pattern)
+ }
+ }
+
/**
* Lists all the files in a directory with the specified prefix, and does not end with the
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of