aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-06-22 18:07:07 +0800
committerCheng Lian <lian@databricks.com>2016-06-22 18:07:07 +0800
commit39ad53f7ffddae5ba0ff0a76089ba671b14c44c8 (patch)
treeaa937f341ef80485cf792907c0effc1d5acf52d8 /sql
parentd281b0bafe6aa23085d4d2b68f0ce321f1978b50 (diff)
downloadspark-39ad53f7ffddae5ba0ff0a76089ba671b14c44c8.tar.gz
spark-39ad53f7ffddae5ba0ff0a76089ba671b14c44c8.tar.bz2
spark-39ad53f7ffddae5ba0ff0a76089ba671b14c44c8.zip
[SPARK-16121] ListingFileCatalog does not list in parallel anymore
## What changes were proposed in this pull request? Seems the fix of SPARK-14959 breaks the parallel partitioning discovery. This PR fixes the problem ## How was this patch tested? Tested manually. (This PR also adds a proper test for SPARK-14959) Author: Yin Huai <yhuai@databricks.com> Closes #13830 from yhuai/SPARK-16121.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala58
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala45
3 files changed, 101 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index f713fdec4e..675e755cb2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable
import scala.util.Try
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.SparkSession
@@ -73,21 +73,67 @@ class ListingFileCatalog(
cachedPartitionSpec = null
}
- protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+ /**
+ * List leaf files of given paths. This method will submit a Spark job to do parallel
+ * listing whenever there is a path having more files than the parallel partition discovery
+ * discovery threshold.
+ */
+ protected[spark] def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
} else {
+ // Right now, the number of paths is less than the value of
+ // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
+ // If there is any child that has more files than the threshold, we will use parallel
+ // listing.
+
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+
val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logTrace(s"Listing $path on driver")
- Try {
- HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
- }.getOrElse(Array.empty[FileStatus])
+
+ val childStatuses = {
+ // TODO: We need to avoid of using Try at here.
+ val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
+ if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
+ }
+
+ childStatuses.map {
+ case f: LocatedFileStatus => f
+
+ // NOTE:
+ //
+ // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+ // operations, calling `getFileBlockLocations` does no harm here since these file system
+ // implementations don't actually issue RPC for this method.
+ //
+ // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+ // be a big deal since we always use to `listLeafFilesInParallel` when the number of
+ // paths exceeds threshold.
+ case f =>
+ if (f.isDirectory ) {
+ // If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959).
+ f
+ } else {
+ HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
+ }
+ }
+ }.filterNot { status =>
+ val name = status.getPath.getName
+ HadoopFsRelation.shouldFilterOut(name)
+ }
+
+ val (dirs, files) = statuses.partition(_.isDirectory)
+
+ // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
+ if (dirs.isEmpty) {
+ mutable.LinkedHashSet(files: _*)
+ } else {
+ mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
}
- mutable.LinkedHashSet(statuses: _*)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 521eb7ffcc..06adaf7112 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -392,13 +392,14 @@ private[sql] object HadoopFsRelation extends Logging {
logTrace(s"Listing ${status.getPath}")
val name = status.getPath.getName.toLowerCase
if (shouldFilterOut(name)) {
- Array.empty
+ Array.empty[FileStatus]
} else {
val statuses = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
}
+ // statuses do not have any dirs.
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus => f
@@ -460,7 +461,9 @@ private[sql] object HadoopFsRelation extends Logging {
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(serializableConfiguration.value)
- Try(listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).getOrElse(Array.empty)
+ // TODO: We need to avoid of using Try at here.
+ Try(listLeafFiles(fs, fs.getFileStatus(path), pathFilter))
+ .getOrElse(Array.empty[FileStatus])
}
}.map { status =>
val blockLocations = status match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 67ff257b93..8d8a18fa93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
@@ -375,6 +375,38 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
+ test("SPARK-14959: Do not call getFileBlockLocations on directories") {
+ // Setting PARALLEL_PARTITION_DISCOVERY_THRESHOLD to 2. So we will first
+ // list file statues at driver side and then for the level of p2, we will list
+ // file statues in parallel.
+ withSQLConf(
+ "fs.file.impl" -> classOf[MockDistributedFileSystem].getName,
+ SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "2") {
+ withTempPath { path =>
+ val tempDir = path.getCanonicalPath
+
+ Seq("p1=1/p2=2/p3=3/file1", "p1=1/p2=3/p3=3/file1").foreach { fileName =>
+ val file = new File(tempDir, fileName)
+ assert(file.getParentFile.exists() || file.getParentFile.mkdirs())
+ util.stringToFile(file, fileName)
+ }
+
+ val fileCatalog = new ListingFileCatalog(
+ sparkSession = spark,
+ paths = Seq(new Path(tempDir)),
+ parameters = Map.empty[String, String],
+ partitionSchema = None)
+ // This should not fail.
+ fileCatalog.listLeafFiles(Seq(new Path(tempDir)))
+
+ // Also have an integration test.
+ checkAnswer(
+ spark.read.text(tempDir).select("p1", "p2", "p3", "value"),
+ Row(1, 2, 3, "p1=1/p2=2/p3=3/file1") :: Row(1, 3, 3, "p1=1/p2=3/p3=3/file1") :: Nil)
+ }
+ }
+ }
+
// Helpers for checking the arguments passed to the FileFormat.
protected val checkPartitionSchema =
@@ -530,3 +562,14 @@ class LocalityTestFileSystem extends RawLocalFileSystem {
Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"), 0, len))
}
}
+
+// This file system is for SPARK-14959 (DistributedFileSystem will throw an exception
+// if we call getFileBlockLocations on a dir).
+class MockDistributedFileSystem extends RawLocalFileSystem {
+
+ override def getFileBlockLocations(
+ file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
+ require(!file.isDirectory, "The file path can not be a directory.")
+ super.getFileBlockLocations(file, start, len)
+ }
+}