aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-09-21 17:12:52 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-21 17:12:52 -0700
commit7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0 (patch)
tree91620fefef7ee75bbc6956edb2d49f7dcd38d0ac
parent8c3ee2bc42e6320b9341cebdba51a00162c897ea (diff)
downloadspark-7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0.tar.gz
spark-7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0.tar.bz2
spark-7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0.zip
[SPARK-17569] Make StructuredStreaming FileStreamSource batch generation faster
## What changes were proposed in this pull request? While getting the batch for a `FileStreamSource` in StructuredStreaming, we know which files we must take specifically. We already have verified that they exist, and have committed them to a metadata log. When creating the FileSourceRelation however for an incremental execution, the code checks the existence of every single file once again! When you have 100,000s of files in a folder, creating the first batch takes 2 hours+ when working with S3! This PR disables that check ## How was this patch tested? Added a unit test to `FileStreamSource`. Author: Burak Yavuz <brkyvz@gmail.com> Closes #15122 from brkyvz/SPARK-17569.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala53
3 files changed, 62 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 93154bd2ca..413976a7ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -316,8 +316,14 @@ case class DataSource(
/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
+ *
+ * @param checkFilesExist Whether to confirm that the files exist when generating the
+ * non-streaming file based datasource. StructuredStreaming jobs already
+ * list file existence, and when generating incremental jobs, the batch
+ * is considered as a non-streaming file based data source. Since we know
+ * that files already exist, we don't need to check them again.
*/
- def resolveRelation(): BaseRelation = {
+ def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
@@ -368,7 +374,7 @@ case class DataSource(
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
- if (!fs.exists(globPath.head)) {
+ if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0dc08b1467..5ebc083a7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -133,7 +133,8 @@ class FileStreamSource(
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = sourceOptions.optionMapWithoutPath)
- Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
+ Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
+ checkFilesExist = false)))
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index c6db2fd3f9..e8fa6a59c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -17,9 +17,19 @@
package org.apache.spark.sql.execution.streaming
+import java.io.File
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
-class FileStreamSourceSuite extends SparkFunSuite {
+class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
import FileStreamSource._
@@ -73,4 +83,45 @@ class FileStreamSourceSuite extends SparkFunSuite {
assert(map.isNewFile(FileEntry("b", 10)))
}
+ testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
+ withTempDir { temp =>
+ spark.conf.set(
+ s"fs.$scheme.impl",
+ classOf[ExistsThrowsExceptionFileSystem].getName)
+ // add the metadata entries as a pre-req
+ val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
+ val metadataLog =
+ new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
+ assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L))))
+
+ val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
+ dir.getAbsolutePath, Map.empty)
+ // this method should throw an exception if `fs.exists` is called during resolveRelation
+ newSource.getBatch(None, LongOffset(1))
+ }
+ }
+}
+
+/** Fake FileSystem to test whether the method `fs.exists` is called during
+ * `DataSource.resolveRelation`.
+ */
+class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
+ override def getUri: URI = {
+ URI.create(s"$scheme:///")
+ }
+
+ override def exists(f: Path): Boolean = {
+ throw new IllegalArgumentException("Exists shouldn't have been called!")
+ }
+
+ /** Simply return an empty file for now. */
+ override def listStatus(file: Path): Array[FileStatus] = {
+ val emptyFile = new FileStatus()
+ emptyFile.setPath(file)
+ Array(emptyFile)
+ }
+}
+
+object ExistsThrowsExceptionFileSystem {
+ val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
}