diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2016-09-21 17:12:52 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-09-21 17:12:52 -0700 |
commit | 7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0 (patch) | |
tree | 91620fefef7ee75bbc6956edb2d49f7dcd38d0ac /sql/core/src/test/scala | |
parent | 8c3ee2bc42e6320b9341cebdba51a00162c897ea (diff) | |
download | spark-7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0.tar.gz spark-7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0.tar.bz2 spark-7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0.zip |
[SPARK-17569] Make StructuredStreaming FileStreamSource batch generation faster
## What changes were proposed in this pull request?
While getting the batch for a `FileStreamSource` in StructuredStreaming, we know which files we must take specifically. We already have verified that they exist, and have committed them to a metadata log. When creating the FileSourceRelation however for an incremental execution, the code checks the existence of every single file once again!
When you have 100,000s of files in a folder, creating the first batch takes 2 hours+ when working with S3! This PR disables that check
## How was this patch tested?
Added a unit test to `FileStreamSource`.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #15122 from brkyvz/SPARK-17569.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala | 53 |
1 files changed, 52 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index c6db2fd3f9..e8fa6a59c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -17,9 +17,19 @@ package org.apache.spark.sql.execution.streaming +import java.io.File +import java.net.URI + +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType -class FileStreamSourceSuite extends SparkFunSuite { +class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext { import FileStreamSource._ @@ -73,4 +83,45 @@ class FileStreamSourceSuite extends SparkFunSuite { assert(map.isNewFile(FileEntry("b", 10))) } + testWithUninterruptibleThread("do not recheck that files exist during getBatch") { + withTempDir { temp => + spark.conf.set( + s"fs.$scheme.impl", + classOf[ExistsThrowsExceptionFileSystem].getName) + // add the metadata entries as a pre-req + val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir + val metadataLog = + new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath) + assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L)))) + + val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), + dir.getAbsolutePath, Map.empty) + // this method should throw an exception if `fs.exists` is called during resolveRelation + newSource.getBatch(None, LongOffset(1)) + } + } +} + +/** Fake FileSystem to test whether the method `fs.exists` is called during + * `DataSource.resolveRelation`. + */ +class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { + override def getUri: URI = { + URI.create(s"$scheme:///") + } + + override def exists(f: Path): Boolean = { + throw new IllegalArgumentException("Exists shouldn't have been called!") + } + + /** Simply return an empty file for now. */ + override def listStatus(file: Path): Array[FileStatus] = { + val emptyFile = new FileStatus() + emptyFile.setPath(file) + Array(emptyFile) + } +} + +object ExistsThrowsExceptionFileSystem { + val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs" } |