aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-09-22 13:05:41 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-09-22 13:05:41 -0700
commit85d609cf25c1da2df3cd4f5d5aeaf3cbcf0d674c (patch)
treedaa705aed500801fbf0d3487a67ef951bbbdf23d
parent9f24a17c59b1130d97efa7d313c06577f7344338 (diff)
downloadspark-85d609cf25c1da2df3cd4f5d5aeaf3cbcf0d674c.tar.gz
spark-85d609cf25c1da2df3cd4f5d5aeaf3cbcf0d674c.tar.bz2
spark-85d609cf25c1da2df3cd4f5d5aeaf3cbcf0d674c.zip
[SPARK-17613] S3A base paths with no '/' at the end return empty DataFrames
## What changes were proposed in this pull request? Consider you have a bucket as `s3a://some-bucket` and under it you have files: ``` s3a://some-bucket/file1.parquet s3a://some-bucket/file2.parquet ``` Getting the parent path of `s3a://some-bucket/file1.parquet` yields `s3a://some-bucket/` and the ListingFileCatalog uses this as the key in the hash map. When catalog.allFiles is called, we use `s3a://some-bucket` (no slash at the end) to get the list of files, and we're left with an empty list! This PR fixes this by adding a `/` at the end of the `URI` iff the given `Path` doesn't have a parent, i.e. is the root. This is a no-op if the path already had a `/` at the end, and is handled through the Hadoop Path, path merging semantics. ## How was this patch tested? Unit test in `FileCatalogSuite`. Author: Burak Yavuz <brkyvz@gmail.com> Closes #15169 from brkyvz/SPARK-17613.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala45
2 files changed, 53 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index d2d5b56c82..702ba97222 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -76,7 +76,15 @@ abstract class PartitioningAwareFileCatalog(
paths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = path.getFileSystem(hadoopConf)
- val qualifiedPath = fs.makeQualified(path)
+ val qualifiedPathPre = fs.makeQualified(path)
+ val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) {
+ // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories,
+ // because the `leafFile.getParent` would have returned an absolute path with the
+ // separator at the end.
+ new Path(qualifiedPathPre, Path.SEPARATOR)
+ } else {
+ qualifiedPathPre
+ }
// There are three cases possible with each path
// 1. The path is a directory and has children files in it. Then it must be present in
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 5c8d3226e9..fa3abd0098 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -18,10 +18,12 @@
package org.apache.spark.sql.execution.datasources
import java.io.File
+import java.net.URI
+import scala.collection.mutable
import scala.language.reflectiveCalls
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext
@@ -78,4 +80,45 @@ class FileCatalogSuite extends SharedSQLContext {
assert(catalog1.listLeafFiles(catalog1.paths).isEmpty)
}
}
+
+ test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
+ class MockCatalog(
+ override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
+
+ override def refresh(): Unit = {}
+
+ override def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = mutable.LinkedHashMap(
+ new Path("mockFs://some-bucket/file1.json") -> new FileStatus()
+ )
+
+ override def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = Map(
+ new Path("mockFs://some-bucket/") -> Array(new FileStatus())
+ )
+
+ override def partitionSpec(): PartitionSpec = {
+ PartitionSpec.emptySpec
+ }
+ }
+
+ withSQLConf(
+ "fs.mockFs.impl" -> classOf[FakeParentPathFileSystem].getName,
+ "fs.mockFs.impl.disable.cache" -> "true") {
+ val pathWithSlash = new Path("mockFs://some-bucket/")
+ assert(pathWithSlash.getParent === null)
+ val pathWithoutSlash = new Path("mockFs://some-bucket")
+ assert(pathWithoutSlash.getParent === null)
+ val catalog1 = new MockCatalog(Seq(pathWithSlash))
+ val catalog2 = new MockCatalog(Seq(pathWithoutSlash))
+ assert(catalog1.allFiles().nonEmpty)
+ assert(catalog2.allFiles().nonEmpty)
+ }
+ }
+}
+
+class FakeParentPathFileSystem extends RawLocalFileSystem {
+ override def getScheme: String = "mockFs"
+
+ override def getUri: URI = {
+ URI.create("mockFs://some-bucket")
+ }
}