aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-06-22 18:07:07 +0800
committerCheng Lian <lian@databricks.com>2016-06-22 18:07:07 +0800
commit39ad53f7ffddae5ba0ff0a76089ba671b14c44c8 (patch)
treeaa937f341ef80485cf792907c0effc1d5acf52d8 /sql/core/src/test
parentd281b0bafe6aa23085d4d2b68f0ce321f1978b50 (diff)
downloadspark-39ad53f7ffddae5ba0ff0a76089ba671b14c44c8.tar.gz
spark-39ad53f7ffddae5ba0ff0a76089ba671b14c44c8.tar.bz2
spark-39ad53f7ffddae5ba0ff0a76089ba671b14c44c8.zip
[SPARK-16121] ListingFileCatalog does not list in parallel anymore
## What changes were proposed in this pull request? Seems the fix of SPARK-14959 breaks the parallel partitioning discovery. This PR fixes the problem ## How was this patch tested? Tested manually. (This PR also adds a proper test for SPARK-14959) Author: Yin Huai <yhuai@databricks.com> Closes #13830 from yhuai/SPARK-16121.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala45
1 files changed, 44 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 67ff257b93..8d8a18fa93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
@@ -375,6 +375,38 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
+ test("SPARK-14959: Do not call getFileBlockLocations on directories") {
+ // Setting PARALLEL_PARTITION_DISCOVERY_THRESHOLD to 2. So we will first
+ // list file statues at driver side and then for the level of p2, we will list
+ // file statues in parallel.
+ withSQLConf(
+ "fs.file.impl" -> classOf[MockDistributedFileSystem].getName,
+ SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "2") {
+ withTempPath { path =>
+ val tempDir = path.getCanonicalPath
+
+ Seq("p1=1/p2=2/p3=3/file1", "p1=1/p2=3/p3=3/file1").foreach { fileName =>
+ val file = new File(tempDir, fileName)
+ assert(file.getParentFile.exists() || file.getParentFile.mkdirs())
+ util.stringToFile(file, fileName)
+ }
+
+ val fileCatalog = new ListingFileCatalog(
+ sparkSession = spark,
+ paths = Seq(new Path(tempDir)),
+ parameters = Map.empty[String, String],
+ partitionSchema = None)
+ // This should not fail.
+ fileCatalog.listLeafFiles(Seq(new Path(tempDir)))
+
+ // Also have an integration test.
+ checkAnswer(
+ spark.read.text(tempDir).select("p1", "p2", "p3", "value"),
+ Row(1, 2, 3, "p1=1/p2=2/p3=3/file1") :: Row(1, 3, 3, "p1=1/p2=3/p3=3/file1") :: Nil)
+ }
+ }
+ }
+
// Helpers for checking the arguments passed to the FileFormat.
protected val checkPartitionSchema =
@@ -530,3 +562,14 @@ class LocalityTestFileSystem extends RawLocalFileSystem {
Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"), 0, len))
}
}
+
+// This file system is for SPARK-14959 (DistributedFileSystem will throw an exception
+// if we call getFileBlockLocations on a dir).
+class MockDistributedFileSystem extends RawLocalFileSystem {
+
+ override def getFileBlockLocations(
+ file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
+ require(!file.isDirectory, "The file path can not be a directory.")
+ super.getFileBlockLocations(file, start, len)
+ }
+}