aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-05-10 16:43:32 -0700
committerYin Huai <yhuai@databricks.com>2016-05-10 16:43:32 -0700
commitd9ca9fd3e582f9d29f8887c095637c93a8b93651 (patch)
tree94ef5e8c0111fd438662ca706df85a95e92b5917 /sql
parent86475520f88f90c9d3b71516f65ccc0e9d244863 (diff)
downloadspark-d9ca9fd3e582f9d29f8887c095637c93a8b93651.tar.gz
spark-d9ca9fd3e582f9d29f8887c095637c93a8b93651.tar.bz2
spark-d9ca9fd3e582f9d29f8887c095637c93a8b93651.zip
[SPARK-14837][SQL][STREAMING] Added support in file stream source for reading new files added to subdirs
## What changes were proposed in this pull request? Currently, file stream source can only find new files if they appear in the directory given to the source, but not if they appear in subdirs. This PR add support for providing glob patterns when creating file stream source so that it can find new files in nested directories based on the glob pattern. ## How was this patch tested? Unit test that tests when new files are discovered with globs and partitioned directories. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #12616 from tdas/SPARK-14837.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala40
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala88
5 files changed, 114 insertions, 37 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0342ec569d..ce45168a13 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -174,25 +174,11 @@ case class DataSource(
s.createSource(sparkSession.wrapped, metadataPath, userSpecifiedSchema, className, options)
case format: FileFormat =>
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val path = caseInsensitiveOptions.getOrElse("path", {
+ val path = new CaseInsensitiveMap(options).getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
-
- def dataFrameBuilder(files: Array[String]): DataFrame = {
- val newOptions = options.filterKeys(_ != "path") + ("basePath" -> path)
- val newDataSource =
- DataSource(
- sparkSession,
- paths = files,
- userSpecifiedSchema = Some(sourceInfo.schema),
- className = className,
- options = new CaseInsensitiveMap(newOptions))
- Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
- }
-
new FileStreamSource(
- sparkSession, metadataPath, path, sourceInfo.schema, dataFrameBuilder)
+ sparkSession, path, className, sourceInfo.schema, metadataPath, options)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed reading")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index bdf43e02f4..5cee2b9af6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -53,6 +53,7 @@ class ListingFileCatalog(
if (cachedPartitionSpec == null) {
cachedPartitionSpec = inferPartitioning()
}
+ logTrace(s"Partition spec: $cachedPartitionSpec")
cachedPartitionSpec
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 5f04a6c60d..27f23c855d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -49,7 +49,7 @@ abstract class PartitioningAwareFileCatalog(
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
- if (partitionSpec().partitionColumns.isEmpty) {
+ val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
} else {
prunePartitions(filters, partitionSpec()).map {
@@ -59,6 +59,8 @@ abstract class PartitioningAwareFileCatalog(
leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_"))
}
}
+ logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
+ selectedPartitions
}
override def allFiles(): Seq[FileStatus] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 7b4c035bf3..bef56160f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -21,9 +21,11 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.fs.Path
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog, LogicalRelation}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.OpenHashSet
/**
@@ -33,12 +35,14 @@ import org.apache.spark.util.collection.OpenHashSet
*/
class FileStreamSource(
sparkSession: SparkSession,
- metadataPath: String,
path: String,
+ fileFormatClassName: String,
override val schema: StructType,
- dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
+ metadataPath: String,
+ options: Map[String, String]) extends Source with Logging {
private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
+ private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns
private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
@@ -69,6 +73,7 @@ class FileStreamSource(
if (newFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, newFiles)
+ logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files")
}
new LongOffset(maxBatchId)
@@ -97,21 +102,30 @@ class FileStreamSource(
assert(startId <= endId)
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
- logDebug(s"Streaming ${files.mkString(", ")}")
- dataFrameBuilder(files)
+ logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
+ val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
+ val newDataSource =
+ DataSource(
+ sparkSession,
+ paths = files,
+ userSpecifiedSchema = Some(schema),
+ className = fileFormatClassName,
+ options = newOptions)
+ Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}
private def fetchAllFiles(): Seq[String] = {
- val startTime = System.nanoTime()
- val files = fs.listStatus(new Path(path))
- .filterNot(_.getPath.getName.startsWith("_"))
- .map(_.getPath.toUri.toString)
- val endTime = System.nanoTime()
- logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
+ val startTime = System.nanoTime
+ val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+ val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
+ val files = catalog.allFiles().map(_.getPath.toUri.toString)
+ val endTime = System.nanoTime
+ logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
+ logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
files
}
override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
- override def toString: String = s"FileSource[$path]"
+ override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 4b95d65627..c97304c0ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class FileStreamSourceTest extends StreamTest with SharedSQLContext {
@@ -58,7 +58,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
addData(source)
source.currentOffset + 1
}
- logInfo(s"Added data to $source at offset $newOffset")
+ logInfo(s"Added file to $source at offset $newOffset")
(source, newOffset)
}
@@ -69,8 +69,11 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
extends AddFileData {
override def addData(source: FileStreamSource): Unit = {
- val file = Utils.tempFileWith(new File(tmp, "text"))
- stringToFile(file, content).renameTo(new File(src, file.getName))
+ val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+ val finalFile = new File(src, tempFile.getName)
+ src.mkdirs()
+ require(stringToFile(tempFile, content).renameTo(finalFile))
+ logInfo(s"Written text '$content' to file $finalFile")
}
}
@@ -89,6 +92,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
df.write.parquet(tmpDir.getCanonicalPath)
+ src.mkdirs()
tmpDir.listFiles().foreach { f =>
f.renameTo(new File(src, s"${f.getName}"))
}
@@ -100,7 +104,6 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
format: String,
path: String,
schema: Option[StructType] = None): DataFrame = {
-
val reader =
if (schema.isDefined) {
spark.read.format(format).schema(schema.get)
@@ -327,7 +330,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
-
test("reading from json files inside partitioned directory") {
withTempDirs { case (baseSrc, tmp) =>
val src = new File(baseSrc, "type=X")
@@ -348,7 +350,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
-
test("reading from json files with changing schema") {
withTempDirs { case (src, tmp) =>
@@ -444,6 +445,79 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
+ test("read new files in nested directories with globbing") {
+ withTempDirs { case (dir, tmp) =>
+
+ // src/*/* should consider all the files and directories that matches that glob.
+ // So any files that matches the glob as well as any files in directories that matches
+ // this glob should be read.
+ val fileStream = createFileStream("text", s"${dir.getCanonicalPath}/*/*")
+ val filtered = fileStream.filter($"value" contains "keep")
+ val subDir = new File(dir, "subdir")
+ val subSubDir = new File(subDir, "subsubdir")
+ val subSubSubDir = new File(subSubDir, "subsubsubdir")
+
+ require(!subDir.exists())
+ require(!subSubDir.exists())
+
+ testStream(filtered)(
+ // Create new dir/subdir and write to it, should read
+ AddTextFileData("drop1\nkeep2", subDir, tmp),
+ CheckAnswer("keep2"),
+
+ // Add files to dir/subdir, should read
+ AddTextFileData("keep3", subDir, tmp),
+ CheckAnswer("keep2", "keep3"),
+
+ // Create new dir/subdir/subsubdir and write to it, should read
+ AddTextFileData("keep4", subSubDir, tmp),
+ CheckAnswer("keep2", "keep3", "keep4"),
+
+ // Add files to dir/subdir/subsubdir, should read
+ AddTextFileData("keep5", subSubDir, tmp),
+ CheckAnswer("keep2", "keep3", "keep4", "keep5"),
+
+ // 1. Add file to src dir, should not read as globbing src/*/* does not capture files in
+ // dir, only captures files in dir/subdir/
+ // 2. Add files to dir/subDir/subsubdir/subsubsubdir, should not read as src/*/* should
+ // not capture those files
+ AddTextFileData("keep6", dir, tmp),
+ AddTextFileData("keep7", subSubSubDir, tmp),
+ AddTextFileData("keep8", subDir, tmp), // needed to make query detect new data
+ CheckAnswer("keep2", "keep3", "keep4", "keep5", "keep8")
+ )
+ }
+ }
+
+ test("read new files in partitioned table with globbing, should not read partition data") {
+ withTempDirs { case (dir, tmp) =>
+ val partitionFooSubDir = new File(dir, "partition=foo")
+ val partitionBarSubDir = new File(dir, "partition=bar")
+
+ val schema = new StructType().add("value", StringType).add("partition", StringType)
+ val fileStream = createFileStream("json", s"${dir.getCanonicalPath}/*/*", Some(schema))
+ val filtered = fileStream.filter($"value" contains "keep")
+ val nullStr = null.asInstanceOf[String]
+ testStream(filtered)(
+ // Create new partition=foo sub dir and write to it, should read only value, not partition
+ AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
+ CheckAnswer(("keep2", nullStr)),
+
+ // Append to same partition=1 sub dir, should read only value, not partition
+ AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
+ CheckAnswer(("keep2", nullStr), ("keep3", nullStr)),
+
+ // Create new partition sub dir and write to it, should read only value, not partition
+ AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
+ CheckAnswer(("keep2", nullStr), ("keep3", nullStr), ("keep4", nullStr)),
+
+ // Append to same partition=2 sub dir, should read only value, not partition
+ AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
+ CheckAnswer(("keep2", nullStr), ("keep3", nullStr), ("keep4", nullStr), ("keep5", nullStr))
+ )
+ }
+ }
+
test("fault tolerance") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("text", src.getCanonicalPath)