aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala38
2 files changed, 33 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 5cee2b9af6..644e5d65d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -77,12 +77,12 @@ class ListingFileCatalog(
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext)
} else {
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ val jobConf = new JobConf(hadoopConf, this.getClass)
+ val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logInfo(s"Listing $path on driver")
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass)
- val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
val statuses = {
val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
@@ -101,7 +101,8 @@ class ListingFileCatalog(
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a
// a big deal since we always use to `listLeafFilesInParallel` when the number of paths
// exceeds threshold.
- case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
+ case f =>
+ HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}.filterNot { status =>
val name = status.getPath.getName
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index b516297115..8d332df029 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -348,28 +348,40 @@ private[sql] object HadoopFsRelation extends Logging {
pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
}
+ /**
+ * Create a LocatedFileStatus using FileStatus and block locations.
+ */
+ def createLocatedFileStatus(f: FileStatus, locations: Array[BlockLocation]): LocatedFileStatus = {
+ // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), which is
+ // very slow on some file system (RawLocalFileSystem, which is launch a subprocess and parse the
+ // stdout).
+ val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+ f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+ if (f.isSymlink) {
+ lfs.setSymlink(f.getSymlink)
+ }
+ lfs
+ }
+
// We don't filter files/directories whose name start with "_" except "_temporary" here, as
// specific data sources may take advantages over them (e.g. Parquet _metadata and
// _common_metadata files). "_temporary" directories are explicitly ignored since failed
// tasks/jobs may leave partial/corrupted data files there. Files and directories whose name
// start with "." are also ignored.
- def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
+ def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
logInfo(s"Listing ${status.getPath}")
val name = status.getPath.getName.toLowerCase
if (shouldFilterOut(name)) {
Array.empty
} else {
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(fs.getConf, this.getClass())
- val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
val statuses = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
- val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
- if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
+ val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
+ if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
}
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus => f
- case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
+ case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}
}
@@ -403,9 +415,15 @@ private[sql] object HadoopFsRelation extends Logging {
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = paths.map(_.toString)
- val fakeStatuses = sparkContext.parallelize(serializedPaths).map(new Path(_)).flatMap { path =>
- val fs = path.getFileSystem(serializableConfiguration.value)
- Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty)
+ val fakeStatuses = sparkContext.parallelize(serializedPaths).mapPartitions { paths =>
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
+ val jobConf = new JobConf(serializableConfiguration.value, this.getClass)
+ val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+ paths.map(new Path(_)).flatMap { path =>
+ val fs = path.getFileSystem(serializableConfiguration.value)
+ Try(listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).getOrElse(Array.empty)
+ }
}.map { status =>
val blockLocations = status match {
case f: LocatedFileStatus =>