aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-05-06 15:04:16 -0700
committerYin Huai <yhuai@databricks.com>2016-05-06 15:04:16 -0700
commitf7b7ef41662d7d02fc4f834f3c6c4ee8802e949c (patch)
tree715c731c578d7ebe519ae3b0473882164a418a20 /sql
parente20cd9f4ce977739ce80a2c39f8ebae5e53f72f6 (diff)
downloadspark-f7b7ef41662d7d02fc4f834f3c6c4ee8802e949c.tar.gz
spark-f7b7ef41662d7d02fc4f834f3c6c4ee8802e949c.tar.bz2
spark-f7b7ef41662d7d02fc4f834f3c6c4ee8802e949c.zip
[SPARK-14997][SQL] Fixed FileCatalog to return correct set of files when there is no partitioning scheme in the given paths
## What changes were proposed in this pull request? Lets says there are json files in the following directories structure ``` xyz/file0.json xyz/subdir1/file1.json xyz/subdir2/file2.json xyz/subdir1/subsubdir1/file3.json ``` `sqlContext.read.json("xyz")` should read only file0.json according to behavior in Spark 1.6.1. However in current master, all the 4 files are read. The fix is to make FileCatalog return only the children files of the given path if there is not partitioning detected (instead of all the recursive list of files). Closes #12774 ## How was this patch tested? unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #12856 from tdas/SPARK-14997.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala24
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala68
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala47
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala15
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala232
5 files changed, 356 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 2c44b399cb..5f04a6c60d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -61,7 +61,29 @@ abstract class PartitioningAwareFileCatalog(
}
}
- override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
+ override def allFiles(): Seq[FileStatus] = {
+ if (partitionSpec().partitionColumns.isEmpty) {
+ // For each of the input paths, get the list of files inside them
+ paths.flatMap { path =>
+ // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+ val fs = path.getFileSystem(hadoopConf)
+ val qualifiedPath = fs.makeQualified(path)
+
+ // There are three cases possible with each path
+ // 1. The path is a directory and has children files in it. Then it must be present in
+ // leafDirToChildrenFiles as those children files will have been found as leaf files.
+ // Find its children files from leafDirToChildrenFiles and include them.
+ // 2. The path is a file, then it will be present in leafFiles. Include this path.
+ // 3. The path is a directory, but has no children files. Do not include this path.
+
+ leafDirToChildrenFiles.get(qualifiedPath)
+ .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
+ .getOrElse(Array.empty)
+ }
+ } else {
+ leafFiles.values.toSeq
+ }
+ }
protected def inferPartitioning(): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
new file mode 100644
index 0000000000..dab5c76200
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileCatalogSuite extends SharedSQLContext {
+
+ test("ListingFileCatalog: leaf files are qualified paths") {
+ withTempDir { dir =>
+ val file = new File(dir, "text.txt")
+ stringToFile(file, "text")
+
+ val path = new Path(file.getCanonicalPath)
+ val catalog = new ListingFileCatalog(sqlContext.sparkSession, Seq(path), Map.empty, None) {
+ def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+ def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+ }
+ assert(catalog.leafFilePaths.forall(p => p.toString.startsWith("file:/")))
+ assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/")))
+ }
+ }
+
+ test("ListingFileCatalog: input paths are converted to qualified paths") {
+ withTempDir { dir =>
+ val file = new File(dir, "text.txt")
+ stringToFile(file, "text")
+
+ val unqualifiedDirPath = new Path(dir.getCanonicalPath)
+ val unqualifiedFilePath = new Path(file.getCanonicalPath)
+ require(!unqualifiedDirPath.toString.contains("file:"))
+ require(!unqualifiedFilePath.toString.contains("file:"))
+
+ val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
+ val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
+ require(qualifiedFilePath.toString.startsWith("file:"))
+
+ val catalog1 = new ListingFileCatalog(
+ sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None)
+ assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
+
+ val catalog2 = new ListingFileCatalog(
+ sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None)
+ assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
+
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index cb2c2522b2..b4d35be05d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -765,6 +765,53 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
+ test("use basePath and file globbing to selectively load partitioned table") {
+ withTempPath { dir =>
+
+ val df = Seq(
+ (1, "foo", 100),
+ (1, "bar", 200),
+ (2, "foo", 300),
+ (2, "bar", 400)
+ ).toDF("p1", "p2", "v")
+ df.write
+ .mode(SaveMode.Overwrite)
+ .partitionBy("p1", "p2")
+ .parquet(dir.getCanonicalPath)
+
+ def check(path: String, basePath: String, expectedDf: DataFrame): Unit = {
+ val testDf = sqlContext.read
+ .option("basePath", basePath)
+ .parquet(path)
+ checkAnswer(testDf, expectedDf)
+ }
+
+ // Should find all the data with partitioning columns when base path is set to the root
+ val resultDf = df.select("v", "p1", "p2")
+ check(path = s"$dir", basePath = s"$dir", resultDf)
+ check(path = s"$dir/*", basePath = s"$dir", resultDf)
+ check(path = s"$dir/*/*", basePath = s"$dir", resultDf)
+ check(path = s"$dir/*/*/*", basePath = s"$dir", resultDf)
+
+ // Should find selective partitions of the data if the base path is not set to root
+
+ check( // read from ../p1=1 with base ../p1=1, should not infer p1 col
+ path = s"$dir/p1=1/*",
+ basePath = s"$dir/p1=1/",
+ resultDf.filter("p1 = 1").drop("p1"))
+
+ check( // red from ../p1=1/p2=foo with base ../p1=1/ should not infer p1
+ path = s"$dir/p1=1/p2=foo/*",
+ basePath = s"$dir/p1=1/",
+ resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1"))
+
+ check( // red from ../p1=1/p2=foo with base ../p1=1/p2=foo, should not infer p1, p2
+ path = s"$dir/p1=1/p2=foo/*",
+ basePath = s"$dir/p1=1/p2=foo/",
+ resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1", "p2"))
+ }
+ }
+
test("_SUCCESS should not break partitioning discovery") {
Seq(1, 32).foreach { threshold =>
// We have two paths to list files, one at driver side, another one that we use
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index bc5c0c1f69..a62852b512 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming
import java.io.File
+import java.util.UUID
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
@@ -84,10 +85,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
AddParquetFileData(seq.toDS().toDF(), src, tmp)
}
+ /** Write parquet files in a temp dir, and move the individual files to the 'src' dir */
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
- val file = Utils.tempFileWith(new File(tmp, "parquet"))
- df.write.parquet(file.getCanonicalPath)
- file.renameTo(new File(src, file.getName))
+ val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
+ df.write.parquet(tmpDir.getCanonicalPath)
+ tmpDir.listFiles().foreach { f =>
+ f.renameTo(new File(src, s"${f.getName}"))
+ }
}
}
@@ -210,8 +214,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
test("FileStreamSource schema: parquet, existing files, no schema") {
withTempDir { src =>
- Seq("a", "b", "c").toDS().as("userColumn").toDF()
- .write.parquet(new File(src, "1").getCanonicalPath)
+ Seq("a", "b", "c").toDS().as("userColumn").toDF().write
+ .mode(org.apache.spark.sql.SaveMode.Overwrite)
+ .parquet(src.getCanonicalPath)
val schema = createFileStreamSourceAndGetSchema(
format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
assert(schema === new StructType().add("value", StringType))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 67b403a9bd..20c5f72ff1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.sources
+import java.io.File
+
import scala.util.Random
import org.apache.hadoop.fs.Path
@@ -486,40 +488,222 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}
- test("Hadoop style globbing") {
+ test("load() - with directory of unpartitioned data in nested subdirs") {
+ withTempPath { dir =>
+ val subdir = new File(dir, "subdir")
+
+ val dataInDir = Seq(1, 2, 3).toDF("value")
+ val dataInSubdir = Seq(4, 5, 6).toDF("value")
+
+ /*
+
+ Directory structure to be generated
+
+ dir
+ |
+ |___ [ files of dataInDir ]
+ |
+ |___ subsubdir
+ |
+ |___ [ files of dataInSubdir ]
+ */
+
+ // Generated dataInSubdir, not data in dir
+ dataInSubdir.write
+ .format(dataSourceName)
+ .mode(SaveMode.Overwrite)
+ .save(subdir.getCanonicalPath)
+
+ // Inferring schema should throw error as it should not find any file to infer
+ val e = intercept[Exception] {
+ sqlContext.read.format(dataSourceName).load(dir.getCanonicalPath)
+ }
+
+ e match {
+ case _: AnalysisException =>
+ assert(e.getMessage.contains("infer"))
+
+ case _: java.util.NoSuchElementException if e.getMessage.contains("dataSchema") =>
+ // Ignore error, the source format requires schema to be provided by user
+ // This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs schema
+
+ case _ =>
+ fail("Unexpected error trying to infer schema from empty dir", e)
+ }
+
+ /** Test whether data is read with the given path matches the expected answer */
+ def testWithPath(path: File, expectedAnswer: Seq[Row]): Unit = {
+ val df = sqlContext.read
+ .format(dataSourceName)
+ .schema(dataInDir.schema) // avoid schema inference for any format
+ .load(path.getCanonicalPath)
+ checkAnswer(df, expectedAnswer)
+ }
+
+ // Verify that reading by path 'dir/' gives empty results as there are no files in 'file'
+ // and it should not pick up files in 'dir/subdir'
+ require(subdir.exists)
+ require(subdir.listFiles().exists(!_.isDirectory))
+ testWithPath(dir, Seq.empty)
+
+ // Verify that if there is data in dir, then reading by path 'dir/' reads only dataInDir
+ dataInDir.write
+ .format(dataSourceName)
+ .mode(SaveMode.Append) // append to prevent subdir from being deleted
+ .save(dir.getCanonicalPath)
+ require(dir.listFiles().exists(!_.isDirectory))
+ require(subdir.exists())
+ require(subdir.listFiles().exists(!_.isDirectory))
+ testWithPath(dir, dataInDir.collect())
+ }
+ }
+
+ test("Hadoop style globbing - unpartitioned data") {
withTempPath { file =>
+
+ val dir = file.getCanonicalPath
+ val subdir = new File(dir, "subdir")
+ val subsubdir = new File(subdir, "subsubdir")
+ val anotherSubsubdir =
+ new File(new File(dir, "another-subdir"), "another-subsubdir")
+
+ val dataInSubdir = Seq(1, 2, 3).toDF("value")
+ val dataInSubsubdir = Seq(4, 5, 6).toDF("value")
+ val dataInAnotherSubsubdir = Seq(7, 8, 9).toDF("value")
+
+ dataInSubdir.write
+ .format (dataSourceName)
+ .mode (SaveMode.Overwrite)
+ .save (subdir.getCanonicalPath)
+
+ dataInSubsubdir.write
+ .format (dataSourceName)
+ .mode (SaveMode.Overwrite)
+ .save (subsubdir.getCanonicalPath)
+
+ dataInAnotherSubsubdir.write
+ .format (dataSourceName)
+ .mode (SaveMode.Overwrite)
+ .save (anotherSubsubdir.getCanonicalPath)
+
+ require(subdir.exists)
+ require(subdir.listFiles().exists(!_.isDirectory))
+ require(subsubdir.exists)
+ require(subsubdir.listFiles().exists(!_.isDirectory))
+ require(anotherSubsubdir.exists)
+ require(anotherSubsubdir.listFiles().exists(!_.isDirectory))
+
+ /*
+ Directory structure generated
+
+ dir
+ |
+ |___ subdir
+ | |
+ | |___ [ files of dataInSubdir ]
+ | |
+ | |___ subsubdir
+ | |
+ | |___ [ files of dataInSubsubdir ]
+ |
+ |
+ |___ anotherSubdir
+ |
+ |___ anotherSubsubdir
+ |
+ |___ [ files of dataInAnotherSubsubdir ]
+ */
+
+ val schema = dataInSubdir.schema
+
+ /** Check whether data is read with the given path matches the expected answer */
+ def check(path: String, expectedDf: DataFrame): Unit = {
+ val df = sqlContext.read
+ .format(dataSourceName)
+ .schema(schema) // avoid schema inference for any format, expected to be same format
+ .load(path)
+ checkAnswer(df, expectedDf)
+ }
+
+ check(s"$dir/*/", dataInSubdir)
+ check(s"$dir/sub*/*", dataInSubdir.union(dataInSubsubdir))
+ check(s"$dir/another*/*", dataInAnotherSubsubdir)
+ check(s"$dir/*/another*", dataInAnotherSubsubdir)
+ check(s"$dir/*/*", dataInSubdir.union(dataInSubsubdir).union(dataInAnotherSubsubdir))
+ }
+ }
+
+ test("Hadoop style globbing - partitioned data with schema inference") {
+
+ // Tests the following on partition data
+ // - partitions are not discovered with globbing and without base path set.
+ // - partitions are discovered with globbing and base path set, though more detailed
+ // tests for this is in ParquetPartitionDiscoverySuite
+
+ withTempPath { path =>
+ val dir = path.getCanonicalPath
partitionedTestDF.write
.format(dataSourceName)
.mode(SaveMode.Overwrite)
.partitionBy("p1", "p2")
- .save(file.getCanonicalPath)
+ .save(dir)
+
+ def check(
+ path: String,
+ expectedResult: Either[DataFrame, String],
+ basePath: Option[String] = None
+ ): Unit = {
+ try {
+ val reader = sqlContext.read
+ basePath.foreach(reader.option("basePath", _))
+ val testDf = reader
+ .format(dataSourceName)
+ .load(path)
+ assert(expectedResult.isLeft, s"Error was expected with $path but result found")
+ checkAnswer(testDf, expectedResult.left.get)
+ } catch {
+ case e: java.util.NoSuchElementException if e.getMessage.contains("dataSchema") =>
+ // Ignore error, the source format requires schema to be provided by user
+ // This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs schema
+
+ case e: Throwable =>
+ assert(expectedResult.isRight, s"Was not expecting error with $path: " + e)
+ assert(
+ e.getMessage.contains(expectedResult.right.get),
+ s"Did not find expected error message wiht $path")
+ }
+ }
- val df = sqlContext.read
- .format(dataSourceName)
- .option("dataSchema", dataSchema.json)
- .option("basePath", file.getCanonicalPath)
- .load(s"${file.getCanonicalPath}/p1=*/p2=???")
-
- val expectedPaths = Set(
- s"${file.getCanonicalFile}/p1=1/p2=foo",
- s"${file.getCanonicalFile}/p1=2/p2=foo",
- s"${file.getCanonicalFile}/p1=1/p2=bar",
- s"${file.getCanonicalFile}/p1=2/p2=bar"
- ).map { p =>
- val path = new Path(p)
- val fs = path.getFileSystem(sqlContext.sessionState.newHadoopConf())
- path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
+ object Error {
+ def apply(msg: String): Either[DataFrame, String] = Right(msg)
}
- val actualPaths = df.queryExecution.analyzed.collectFirst {
- case LogicalRelation(relation: HadoopFsRelation, _, _) =>
- relation.location.paths.map(_.toString).toSet
- }.getOrElse {
- fail("Expect an FSBasedRelation, but none could be found")
+ object Result {
+ def apply(df: DataFrame): Either[DataFrame, String] = Left(df)
}
- assert(actualPaths === expectedPaths)
- checkAnswer(df, partitionedTestDF.collect())
+ // ---- Without base path set ----
+ // Should find all the data with partitioning columns
+ check(s"$dir", Result(partitionedTestDF))
+
+ // Should fail as globbing finds dirs without files, only subdirs in them.
+ check(s"$dir/*/", Error("please set \"basePath\""))
+ check(s"$dir/p1=*/", Error("please set \"basePath\""))
+
+ // Should not find partition columns as the globs resolve to p2 dirs
+ // with files in them
+ check(s"$dir/*/*", Result(partitionedTestDF.drop("p1", "p2")))
+ check(s"$dir/p1=*/p2=foo", Result(partitionedTestDF.filter("p2 = 'foo'").drop("p1", "p2")))
+ check(s"$dir/p1=1/p2=???", Result(partitionedTestDF.filter("p1 = 1").drop("p1", "p2")))
+
+ // Should find all data without the partitioning columns as the globs resolve to the files
+ check(s"$dir/*/*/*", Result(partitionedTestDF.drop("p1", "p2")))
+
+ // ---- With base path set ----
+ val resultDf = partitionedTestDF.select("a", "b", "p1", "p2")
+ check(path = s"$dir/*", Result(resultDf), basePath = Some(dir))
+ check(path = s"$dir/*/*", Result(resultDf), basePath = Some(dir))
+ check(path = s"$dir/*/*/*", Result(resultDf), basePath = Some(dir))
}
}