aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2017-02-28 22:58:51 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-02-28 22:58:51 -0800
commit4913c92c2fbfcc22b41afb8ce79687165392d7da (patch)
tree3879e2eed39d386aaf67383b7f6abdb170e923f0 /sql/core/src
parent89cd3845b6edb165236a6498dcade033975ee276 (diff)
downloadspark-4913c92c2fbfcc22b41afb8ce79687165392d7da.tar.gz
spark-4913c92c2fbfcc22b41afb8ce79687165392d7da.tar.bz2
spark-4913c92c2fbfcc22b41afb8ce79687165392d7da.zip
[SPARK-19633][SS] FileSource read from FileSink
## What changes were proposed in this pull request? Right now file source always uses `InMemoryFileIndex` to scan files from a given path. But when reading the outputs from another streaming query, the file source should use `MetadataFileIndex` to list files from the sink log. This patch adds this support. ## `MetadataFileIndex` or `InMemoryFileIndex` ```scala spark .readStream .format(...) .load("/some/path") // for a non-glob path: // - use `MetadataFileIndex` when `/some/path/_spark_meta` exists // - fall back to `InMemoryFileIndex` otherwise ``` ```scala spark .readStream .format(...) .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex` ``` ## How was this patch tested? two newly added tests Author: Liwei Lin <lwlin7@gmail.com> Closes #16987 from lw-lin/source-read-from-sink.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala63
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala117
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala8
5 files changed, 200 insertions, 41 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d510581f90..c1353d41e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -280,28 +280,6 @@ case class DataSource(
}
/**
- * Returns true if there is a single path that has a metadata log indicating which files should
- * be read.
- */
- def hasMetadata(path: Seq[String]): Boolean = {
- path match {
- case Seq(singlePath) =>
- try {
- val hdfsPath = new Path(singlePath)
- val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
- val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
- val res = fs.exists(metadataPath)
- res
- } catch {
- case NonFatal(e) =>
- logWarning(s"Error while looking for metadata directory.")
- false
- }
- case _ => false
- }
- }
-
- /**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
*
@@ -331,7 +309,9 @@ case class DataSource(
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
- if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
+ if FileStreamSink.hasMetadata(
+ caseInsensitiveOptions.get("path").toSeq ++ paths,
+ sparkSession.sessionState.newHadoopConf()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
val dataSchema = userSpecifiedSchema.orElse {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 0dbe2a71ed..07ec4e9429 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.execution.streaming
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
@@ -25,9 +28,31 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter}
-object FileStreamSink {
+object FileStreamSink extends Logging {
// The name of the subdirectory that is used to store metadata about which files are valid.
val metadataDir = "_spark_metadata"
+
+ /**
+ * Returns true if there is a single path that has a metadata log indicating which files should
+ * be read.
+ */
+ def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
+ path match {
+ case Seq(singlePath) =>
+ try {
+ val hdfsPath = new Path(singlePath)
+ val fs = hdfsPath.getFileSystem(hadoopConf)
+ val metadataPath = new Path(hdfsPath, metadataDir)
+ val res = fs.exists(metadataPath)
+ res
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Error while looking for metadata directory.")
+ false
+ }
+ case _ => false
+ }
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 39c0b49796..6a7263ca45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
import scala.collection.JavaConverters._
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
@@ -43,8 +43,10 @@ class FileStreamSource(
private val sourceOptions = new FileStreamOptions(options)
+ private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+
private val qualifiedBasePath: Path = {
- val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
+ val fs = new Path(path).getFileSystem(hadoopConf)
fs.makeQualified(new Path(path)) // can contains glob patterns
}
@@ -158,13 +160,64 @@ class FileStreamSource(
}
/**
+ * If the source has a metadata log indicating which files should be read, then we should use it.
+ * Only when user gives a non-glob path that will we figure out whether the source has some
+ * metadata log
+ *
+ * None means we don't know at the moment
+ * Some(true) means we know for sure the source DOES have metadata
+ * Some(false) means we know for sure the source DOSE NOT have metadata
+ */
+ @volatile private[sql] var sourceHasMetadata: Option[Boolean] =
+ if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None
+
+ private def allFilesUsingInMemoryFileIndex() = {
+ val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+ val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
+ fileIndex.allFiles()
+ }
+
+ private def allFilesUsingMetadataLogFileIndex() = {
+ // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
+ // non-glob path
+ new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
+ }
+
+ /**
* Returns a list of files found, sorted by their timestamp.
*/
private def fetchAllFiles(): Seq[(String, Long)] = {
val startTime = System.nanoTime
- val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
- val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
- val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status =>
+
+ var allFiles: Seq[FileStatus] = null
+ sourceHasMetadata match {
+ case None =>
+ if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
+ sourceHasMetadata = Some(true)
+ allFiles = allFilesUsingMetadataLogFileIndex()
+ } else {
+ allFiles = allFilesUsingInMemoryFileIndex()
+ if (allFiles.isEmpty) {
+ // we still cannot decide
+ } else {
+ // decide what to use for future rounds
+ // double check whether source has metadata, preventing the extreme corner case that
+ // metadata log and data files are only generated after the previous
+ // `FileStreamSink.hasMetadata` check
+ if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
+ sourceHasMetadata = Some(true)
+ allFiles = allFilesUsingMetadataLogFileIndex()
+ } else {
+ sourceHasMetadata = Some(false)
+ // `allFiles` have already been fetched using InMemoryFileIndex in this round
+ }
+ }
+ }
+ case Some(true) => allFiles = allFilesUsingMetadataLogFileIndex()
+ case Some(false) => allFiles = allFilesUsingInMemoryFileIndex()
+ }
+
+ val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 5110d89c85..1586850c77 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -52,10 +52,7 @@ abstract class FileStreamSourceTest
query.nonEmpty,
"Cannot add data when there is no query for finding the active file stream source")
- val sources = query.get.logicalPlan.collect {
- case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
- source.asInstanceOf[FileStreamSource]
- }
+ val sources = getSourcesFromStreamingQuery(query.get)
if (sources.isEmpty) {
throw new Exception(
"Could not find file source in the StreamExecution logical plan to add data to")
@@ -134,6 +131,14 @@ abstract class FileStreamSourceTest
}.head
}
+ protected def getSourcesFromStreamingQuery(query: StreamExecution): Seq[FileStreamSource] = {
+ query.logicalPlan.collect {
+ case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
+ source.asInstanceOf[FileStreamSource]
+ }
+ }
+
+
protected def withTempDirs(body: (File, File) => Unit) {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
@@ -388,9 +393,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
CheckAnswer("a", "b", "c", "d"),
AssertOnQuery("seen files should contain only one entry") { streamExecution =>
- val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation =>
- e.source.asInstanceOf[FileStreamSource]
- }.head
+ val source = getSourcesFromStreamingQuery(streamExecution).head
assert(source.seenFiles.size == 1)
true
}
@@ -662,6 +665,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ test("read data from outputs of another streaming query") {
+ withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+ withTempDirs { case (outputDir, checkpointDir) =>
+ // q1 is a streaming query that reads from memory and writes to text files
+ val q1Source = MemoryStream[String]
+ val q1 =
+ q1Source
+ .toDF()
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("text")
+ .start(outputDir.getCanonicalPath)
+
+ // q2 is a streaming query that reads q1's text outputs
+ val q2 =
+ createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep")
+
+ def q1AddData(data: String*): StreamAction =
+ Execute { _ =>
+ q1Source.addData(data)
+ q1.processAllAvailable()
+ }
+ def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
+
+ testStream(q2)(
+ // batch 0
+ q1AddData("drop1", "keep2"),
+ q2ProcessAllAvailable(),
+ CheckAnswer("keep2"),
+
+ // batch 1
+ Assert {
+ // create a text file that won't be on q1's sink log
+ // thus even if its content contains "keep", it should NOT appear in q2's answer
+ val shouldNotKeep = new File(outputDir, "should_not_keep.txt")
+ stringToFile(shouldNotKeep, "should_not_keep!!!")
+ shouldNotKeep.exists()
+ },
+ q1AddData("keep3"),
+ q2ProcessAllAvailable(),
+ CheckAnswer("keep2", "keep3"),
+
+ // batch 2: check that things work well when the sink log gets compacted
+ q1AddData("keep4"),
+ Assert {
+ // compact interval is 3, so file "2.compact" should exist
+ new File(outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
+ },
+ q2ProcessAllAvailable(),
+ CheckAnswer("keep2", "keep3", "keep4"),
+
+ Execute { _ => q1.stop() }
+ )
+ }
+ }
+ }
+
+ test("start before another streaming query, and read its output") {
+ withTempDirs { case (outputDir, checkpointDir) =>
+ // q1 is a streaming query that reads from memory and writes to text files
+ val q1Source = MemoryStream[String]
+ // define q1, but don't start it for now
+ val q1Write =
+ q1Source
+ .toDF()
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("text")
+ var q1: StreamingQuery = null
+
+ val q2 = createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep")
+
+ testStream(q2)(
+ AssertOnQuery { q2 =>
+ val fileSource = getSourcesFromStreamingQuery(q2).head
+ // q1 has not started yet, verify that q2 doesn't know whether q1 has metadata
+ fileSource.sourceHasMetadata === None
+ },
+ Execute { _ =>
+ q1 = q1Write.start(outputDir.getCanonicalPath)
+ q1Source.addData("drop1", "keep2")
+ q1.processAllAvailable()
+ },
+ AssertOnQuery { q2 =>
+ q2.processAllAvailable()
+ val fileSource = getSourcesFromStreamingQuery(q2).head
+ // q1 has started, verify that q2 knows q1 has metadata by now
+ fileSource.sourceHasMetadata === Some(true)
+ },
+ CheckAnswer("keep2"),
+ Execute { _ => q1.stop() }
+ )
+ }
+ }
+
test("when schema inference is turned on, should read partition data") {
def createFile(content: String, src: File, tmp: File): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
@@ -755,10 +853,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
.streamingQuery
q.processAllAvailable()
val memorySink = q.sink.asInstanceOf[MemorySink]
- val fileSource = q.logicalPlan.collect {
- case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
- source.asInstanceOf[FileStreamSource]
- }.head
+ val fileSource = getSourcesFromStreamingQuery(q).head
/** Check the data read in the last batch */
def checkLastBatchData(data: Int*): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index af2f31a34d..60e2375a98 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -208,6 +208,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
}
+ /** Execute arbitrary code */
+ object Execute {
+ def apply(func: StreamExecution => Any): AssertOnQuery =
+ AssertOnQuery(query => { func(query); true })
+ }
+
class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
private var waitStartTime: Option[Long] = None
@@ -472,7 +478,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case a: AssertOnQuery =>
verify(currentStream != null || lastStream != null,
- "cannot assert when not stream has been started")
+ "cannot assert when no stream has been started")
val streamToAssert = Option(currentStream).getOrElse(lastStream)
verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}")