diff options
Diffstat (limited to 'sql')
3 files changed, 101 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index f713fdec4e..675e755cb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable import scala.util.Try -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.SparkSession @@ -73,21 +73,67 @@ class ListingFileCatalog( cachedPartitionSpec = null } - protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + */ + protected[spark] def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) } else { + // Right now, the number of paths is less than the value of + // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. + // If there is any child that has more files than the threshold, we will use parallel + // listing. + // Dummy jobconf to get to the pathFilter defined in configuration val jobConf = new JobConf(hadoopConf, this.getClass) val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logTrace(s"Listing $path on driver") - Try { - HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter) - }.getOrElse(Array.empty[FileStatus]) + + val childStatuses = { + // TODO: We need to avoid of using Try at here. + val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) + if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats + } + + childStatuses.map { + case f: LocatedFileStatus => f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => + if (f.isDirectory ) { + // If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959). + f + } else { + HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + } + } + }.filterNot { status => + val name = status.getPath.getName + HadoopFsRelation.shouldFilterOut(name) + } + + val (dirs, files) = statuses.partition(_.isDirectory) + + // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) + if (dirs.isEmpty) { + mutable.LinkedHashSet(files: _*) + } else { + mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) } - mutable.LinkedHashSet(statuses: _*) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 521eb7ffcc..06adaf7112 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -392,13 +392,14 @@ private[sql] object HadoopFsRelation extends Logging { logTrace(s"Listing ${status.getPath}") val name = status.getPath.getName.toLowerCase if (shouldFilterOut(name)) { - Array.empty + Array.empty[FileStatus] } else { val statuses = { val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory) val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter)) if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats } + // statuses do not have any dirs. statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { case f: LocatedFileStatus => f @@ -460,7 +461,9 @@ private[sql] object HadoopFsRelation extends Logging { val pathFilter = FileInputFormat.getInputPathFilter(jobConf) paths.map(new Path(_)).flatMap { path => val fs = path.getFileSystem(serializableConfiguration.value) - Try(listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).getOrElse(Array.empty) + // TODO: We need to avoid of using Try at here. + Try(listLeafFiles(fs, fs.getFileStatus(path), pathFilter)) + .getOrElse(Array.empty[FileStatus]) } }.map { status => val blockLocations = status match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 67ff257b93..8d8a18fa93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -21,7 +21,7 @@ import java.io.File import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem} +import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem} import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkConf @@ -375,6 +375,38 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("SPARK-14959: Do not call getFileBlockLocations on directories") { + // Setting PARALLEL_PARTITION_DISCOVERY_THRESHOLD to 2. So we will first + // list file statues at driver side and then for the level of p2, we will list + // file statues in parallel. + withSQLConf( + "fs.file.impl" -> classOf[MockDistributedFileSystem].getName, + SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "2") { + withTempPath { path => + val tempDir = path.getCanonicalPath + + Seq("p1=1/p2=2/p3=3/file1", "p1=1/p2=3/p3=3/file1").foreach { fileName => + val file = new File(tempDir, fileName) + assert(file.getParentFile.exists() || file.getParentFile.mkdirs()) + util.stringToFile(file, fileName) + } + + val fileCatalog = new ListingFileCatalog( + sparkSession = spark, + paths = Seq(new Path(tempDir)), + parameters = Map.empty[String, String], + partitionSchema = None) + // This should not fail. + fileCatalog.listLeafFiles(Seq(new Path(tempDir))) + + // Also have an integration test. + checkAnswer( + spark.read.text(tempDir).select("p1", "p2", "p3", "value"), + Row(1, 2, 3, "p1=1/p2=2/p3=3/file1") :: Row(1, 3, 3, "p1=1/p2=3/p3=3/file1") :: Nil) + } + } + } + // Helpers for checking the arguments passed to the FileFormat. protected val checkPartitionSchema = @@ -530,3 +562,14 @@ class LocalityTestFileSystem extends RawLocalFileSystem { Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"), 0, len)) } } + +// This file system is for SPARK-14959 (DistributedFileSystem will throw an exception +// if we call getFileBlockLocations on a dir). +class MockDistributedFileSystem extends RawLocalFileSystem { + + override def getFileBlockLocations( + file: FileStatus, start: Long, len: Long): Array[BlockLocation] = { + require(!file.isDirectory, "The file path can not be a directory.") + super.getFileBlockLocations(file, start, len) + } +} |