aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-09-21 17:12:52 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-21 17:12:52 -0700
commit7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0 (patch)
tree91620fefef7ee75bbc6956edb2d49f7dcd38d0ac /sql/core/src/test/scala
parent8c3ee2bc42e6320b9341cebdba51a00162c897ea (diff)
downloadspark-7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0.tar.gz
spark-7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0.tar.bz2
spark-7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0.zip
[SPARK-17569] Make StructuredStreaming FileStreamSource batch generation faster
## What changes were proposed in this pull request? While getting the batch for a `FileStreamSource` in StructuredStreaming, we know which files we must take specifically. We already have verified that they exist, and have committed them to a metadata log. When creating the FileSourceRelation however for an incremental execution, the code checks the existence of every single file once again! When you have 100,000s of files in a folder, creating the first batch takes 2 hours+ when working with S3! This PR disables that check ## How was this patch tested? Added a unit test to `FileStreamSource`. Author: Burak Yavuz <brkyvz@gmail.com> Closes #15122 from brkyvz/SPARK-17569.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala53
1 files changed, 52 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index c6db2fd3f9..e8fa6a59c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -17,9 +17,19 @@
package org.apache.spark.sql.execution.streaming
+import java.io.File
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
-class FileStreamSourceSuite extends SparkFunSuite {
+class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
import FileStreamSource._
@@ -73,4 +83,45 @@ class FileStreamSourceSuite extends SparkFunSuite {
assert(map.isNewFile(FileEntry("b", 10)))
}
+ testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
+ withTempDir { temp =>
+ spark.conf.set(
+ s"fs.$scheme.impl",
+ classOf[ExistsThrowsExceptionFileSystem].getName)
+ // add the metadata entries as a pre-req
+ val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
+ val metadataLog =
+ new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
+ assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L))))
+
+ val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
+ dir.getAbsolutePath, Map.empty)
+ // this method should throw an exception if `fs.exists` is called during resolveRelation
+ newSource.getBatch(None, LongOffset(1))
+ }
+ }
+}
+
+/** Fake FileSystem to test whether the method `fs.exists` is called during
+ * `DataSource.resolveRelation`.
+ */
+class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
+ override def getUri: URI = {
+ URI.create(s"$scheme:///")
+ }
+
+ override def exists(f: Path): Boolean = {
+ throw new IllegalArgumentException("Exists shouldn't have been called!")
+ }
+
+ /** Simply return an empty file for now. */
+ override def listStatus(file: Path): Array[FileStatus] = {
+ val emptyFile = new FileStatus()
+ emptyFile.setPath(file)
+ Array(emptyFile)
+ }
+}
+
+object ExistsThrowsExceptionFileSystem {
+ val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
}