diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-04-28 12:59:08 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-04-28 12:59:08 -0700 |
commit | 0ee5419b6ce535c714718d0d33b80eedd4b0a5fd (patch) | |
tree | 801fc6973ff7d81d924b32b3fac7dc356836f46c /sql | |
parent | d5ab42ceb94feb3645578bf459b104fb0b0d5682 (diff) | |
download | spark-0ee5419b6ce535c714718d0d33b80eedd4b0a5fd.tar.gz spark-0ee5419b6ce535c714718d0d33b80eedd4b0a5fd.tar.bz2 spark-0ee5419b6ce535c714718d0d33b80eedd4b0a5fd.zip |
[SPARK-14970][SQL] Prevent DataSource from enumerates all files in a directory if there is user specified schema
## What changes were proposed in this pull request?
The FileCatalog object gets created even if the user specifies schema, which means files in the directory is enumerated even thought its not necessary. For large directories this is very slow. User would want to specify schema in such scenarios of large dirs, and this defeats the purpose quite a bit.
## How was this patch tested?
Hard to test this with unit test.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #12748 from tdas/SPARK-14970.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala | 19 |
1 files changed, 9 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 2f3826f72b..63dc1fd71e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -127,17 +127,16 @@ case class DataSource( } private def inferFileFormatSchema(format: FileFormat): StructType = { - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - val allPaths = caseInsensitiveOptions.get("path") - val globbedPaths = allPaths.toSeq.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) - }.toArray - - val fileCatalog: FileCatalog = new HDFSFileCatalog(sparkSession, options, globbedPaths, None) userSpecifiedSchema.orElse { + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val allPaths = caseInsensitiveOptions.get("path") + val globbedPaths = allPaths.toSeq.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified) + }.toArray + val fileCatalog: FileCatalog = new HDFSFileCatalog(sparkSession, options, globbedPaths, None) format.inferSchema( sparkSession, caseInsensitiveOptions, |