aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-09-20 10:24:12 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-20 10:24:12 -0700
commita6aade0042d9c065669f46d2dac40ec6ce361e63 (patch)
tree01d6a34c34b222a6d1e406aa8de821f696cfcc67
parenteb004c66200da7df36dd0a9a11999fc352197916 (diff)
downloadspark-a6aade0042d9c065669f46d2dac40ec6ce361e63.tar.gz
spark-a6aade0042d9c065669f46d2dac40ec6ce361e63.tar.bz2
spark-a6aade0042d9c065669f46d2dac40ec6ce361e63.zip
[SPARK-15698][SQL][STREAMING] Add the ability to remove the old MetadataLog in FileStreamSource
## What changes were proposed in this pull request? Current `metadataLog` in `FileStreamSource` will add a checkpoint file in each batch but do not have the ability to remove/compact, which will lead to large number of small files when running for a long time. So here propose to compact the old logs into one file. This method is quite similar to `FileStreamSinkLog` but simpler. ## How was this patch tested? Unit test added. Author: jerryshao <sshao@hortonworks.com> Closes #13513 from jerryshao/SPARK-15698.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala245
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala212
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala132
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala35
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala99
9 files changed, 550 insertions, 222 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
new file mode 100644
index 0000000000..027b5bbfab
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.IOException
+import java.nio.charset.StandardCharsets.UTF_8
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An abstract class for compactible metadata logs. It will write one log file for each batch.
+ * The first line of the log file is the version number, and there are multiple serialized
+ * metadata lines following.
+ *
+ * As reading from many small files is usually pretty slow, also too many
+ * small files in one folder will mess the FS, [[CompactibleFileStreamLog]] will
+ * compact log files every 10 batches by default into a big file. When
+ * doing a compaction, it will read all old log files and merge them with the new batch.
+ */
+abstract class CompactibleFileStreamLog[T: ClassTag](
+ metadataLogVersion: String,
+ sparkSession: SparkSession,
+ path: String)
+ extends HDFSMetadataLog[Array[T]](sparkSession, path) {
+
+ import CompactibleFileStreamLog._
+
+ /**
+ * If we delete the old files after compaction at once, there is a race condition in S3: other
+ * processes may see the old files are deleted but still cannot see the compaction file using
+ * "list". The `allFiles` handles this by looking for the next compaction file directly, however,
+ * a live lock may happen if the compaction happens too frequently: one processing keeps deleting
+ * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
+ */
+ protected def fileCleanupDelayMs: Long
+
+ protected def isDeletingExpiredLog: Boolean
+
+ protected def compactInterval: Int
+
+ /**
+ * Serialize the data into encoded string.
+ */
+ protected def serializeData(t: T): String
+
+ /**
+ * Deserialize the string into data object.
+ */
+ protected def deserializeData(encodedString: String): T
+
+ /**
+ * Filter out the obsolete logs.
+ */
+ def compactLogs(logs: Seq[T]): Seq[T]
+
+ override def batchIdToPath(batchId: Long): Path = {
+ if (isCompactionBatch(batchId, compactInterval)) {
+ new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX")
+ } else {
+ new Path(metadataPath, batchId.toString)
+ }
+ }
+
+ override def pathToBatchId(path: Path): Long = {
+ getBatchIdFromFileName(path.getName)
+ }
+
+ override def isBatchFile(path: Path): Boolean = {
+ try {
+ getBatchIdFromFileName(path.getName)
+ true
+ } catch {
+ case _: NumberFormatException => false
+ }
+ }
+
+ override def serialize(logData: Array[T]): Array[Byte] = {
+ (metadataLogVersion +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8)
+ }
+
+ override def deserialize(bytes: Array[Byte]): Array[T] = {
+ val lines = new String(bytes, UTF_8).split("\n")
+ if (lines.length == 0) {
+ throw new IllegalStateException("Incomplete log file")
+ }
+ val version = lines(0)
+ if (version != metadataLogVersion) {
+ throw new IllegalStateException(s"Unknown log version: ${version}")
+ }
+ lines.slice(1, lines.length).map(deserializeData)
+ }
+
+ override def add(batchId: Long, logs: Array[T]): Boolean = {
+ if (isCompactionBatch(batchId, compactInterval)) {
+ compact(batchId, logs)
+ } else {
+ super.add(batchId, logs)
+ }
+ }
+
+ /**
+ * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the
+ * corresponding `batchId` file. It will delete expired files as well if enabled.
+ */
+ private def compact(batchId: Long, logs: Array[T]): Boolean = {
+ val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
+ val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs
+ if (super.add(batchId, compactLogs(allLogs).toArray)) {
+ if (isDeletingExpiredLog) {
+ deleteExpiredLog(batchId)
+ }
+ true
+ } else {
+ // Return false as there is another writer.
+ false
+ }
+ }
+
+ /**
+ * Returns all files except the deleted ones.
+ */
+ def allFiles(): Array[T] = {
+ var latestId = getLatest().map(_._1).getOrElse(-1L)
+ // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog`
+ // is calling this method. This loop will retry the reading to deal with the
+ // race condition.
+ while (true) {
+ if (latestId >= 0) {
+ try {
+ val logs =
+ getAllValidBatches(latestId, compactInterval).flatMap(id => super.get(id)).flatten
+ return compactLogs(logs).toArray
+ } catch {
+ case e: IOException =>
+ // Another process using `CompactibleFileStreamLog` may delete the batch files when
+ // `StreamFileCatalog` are reading. However, it only happens when a compaction is
+ // deleting old files. If so, let's try the next compaction batch and we should find it.
+ // Otherwise, this is a real IO issue and we should throw it.
+ latestId = nextCompactionBatchId(latestId, compactInterval)
+ super.get(latestId).getOrElse {
+ throw e
+ }
+ }
+ } else {
+ return Array.empty
+ }
+ }
+ Array.empty
+ }
+
+ /**
+ * Since all logs before `compactionBatchId` are compacted and written into the
+ * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of
+ * S3, the compaction file may not be seen by other processes at once. So we only delete files
+ * created `fileCleanupDelayMs` milliseconds ago.
+ */
+ private def deleteExpiredLog(compactionBatchId: Long): Unit = {
+ val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
+ fileManager.list(metadataPath, new PathFilter {
+ override def accept(path: Path): Boolean = {
+ try {
+ val batchId = getBatchIdFromFileName(path.getName)
+ batchId < compactionBatchId
+ } catch {
+ case _: NumberFormatException =>
+ false
+ }
+ }
+ }).foreach { f =>
+ if (f.getModificationTime <= expiredTime) {
+ fileManager.delete(f.getPath)
+ }
+ }
+ }
+}
+
+object CompactibleFileStreamLog {
+ val COMPACT_FILE_SUFFIX = ".compact"
+
+ def getBatchIdFromFileName(fileName: String): Long = {
+ fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong
+ }
+
+ /**
+ * Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every
+ * `compactInterval` commits.
+ *
+ * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches.
+ */
+ def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = {
+ (batchId + 1) % compactInterval == 0
+ }
+
+ /**
+ * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
+ * need to do a new compaction.
+ *
+ * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
+ * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
+ */
+ def getValidBatchesBeforeCompactionBatch(
+ compactionBatchId: Long,
+ compactInterval: Int): Seq[Long] = {
+ assert(isCompactionBatch(compactionBatchId, compactInterval),
+ s"$compactionBatchId is not a compaction batch")
+ (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
+ }
+
+ /**
+ * Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just
+ * return itself. Otherwise, it will find the previous compaction batch and return all batches
+ * between it and `batchId`.
+ */
+ def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = {
+ assert(batchId >= 0)
+ val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1)
+ start to batchId
+ }
+
+ /**
+ * Returns the next compaction batch id after `batchId`.
+ */
+ def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = {
+ (batchId + compactInterval + 1) / compactInterval * compactInterval - 1
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 0f7d958136..02c5b857ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -56,7 +56,8 @@ class FileStreamSink(
private val basePath = new Path(path)
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
- private val fileLog = new FileStreamSinkLog(sparkSession, logPath.toUri.toString)
+ private val fileLog =
+ new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val fs = basePath.getFileSystem(hadoopConf)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 6f9f7c18c4..64f2f00484 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -17,10 +17,7 @@
package org.apache.spark.sql.execution.streaming
-import java.io.IOException
-import java.nio.charset.StandardCharsets.UTF_8
-
-import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
@@ -79,213 +76,46 @@ object SinkFileStatus {
* When the reader uses `allFiles` to list all files, this method only returns the visible files
* (drops the deleted files).
*/
-class FileStreamSinkLog(sparkSession: SparkSession, path: String)
- extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {
-
- import FileStreamSinkLog._
+class FileStreamSinkLog(
+ metadataLogVersion: String,
+ sparkSession: SparkSession,
+ path: String)
+ extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
private implicit val formats = Serialization.formats(NoTypeHints)
- /**
- * If we delete the old files after compaction at once, there is a race condition in S3: other
- * processes may see the old files are deleted but still cannot see the compaction file using
- * "list". The `allFiles` handles this by looking for the next compaction file directly, however,
- * a live lock may happen if the compaction happens too frequently: one processing keeps deleting
- * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
- */
- private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
+ protected override val fileCleanupDelayMs =
+ sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
- private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
+ protected override val isDeletingExpiredLog =
+ sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
- private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval
+ protected override val compactInterval =
+ sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")
- override def batchIdToPath(batchId: Long): Path = {
- if (isCompactionBatch(batchId, compactInterval)) {
- new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX")
- } else {
- new Path(metadataPath, batchId.toString)
- }
- }
-
- override def pathToBatchId(path: Path): Long = {
- getBatchIdFromFileName(path.getName)
- }
-
- override def isBatchFile(path: Path): Boolean = {
- try {
- getBatchIdFromFileName(path.getName)
- true
- } catch {
- case _: NumberFormatException => false
- }
- }
-
- override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = {
- (VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8)
+ protected override def serializeData(data: SinkFileStatus): String = {
+ write(data)
}
- override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = {
- val lines = new String(bytes, UTF_8).split("\n")
- if (lines.length == 0) {
- throw new IllegalStateException("Incomplete log file")
- }
- val version = lines(0)
- if (version != VERSION) {
- throw new IllegalStateException(s"Unknown log version: ${version}")
- }
- lines.slice(1, lines.length).map(read[SinkFileStatus](_))
- }
-
- override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = {
- if (isCompactionBatch(batchId, compactInterval)) {
- compact(batchId, logs)
- } else {
- super.add(batchId, logs)
- }
+ protected override def deserializeData(encodedString: String): SinkFileStatus = {
+ read[SinkFileStatus](encodedString)
}
- /**
- * Returns all files except the deleted ones.
- */
- def allFiles(): Array[SinkFileStatus] = {
- var latestId = getLatest().map(_._1).getOrElse(-1L)
- // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog`
- // is calling this method. This loop will retry the reading to deal with the
- // race condition.
- while (true) {
- if (latestId >= 0) {
- val startId = getAllValidBatches(latestId, compactInterval)(0)
- try {
- val logs = get(Some(startId), Some(latestId)).flatMap(_._2)
- return compactLogs(logs).toArray
- } catch {
- case e: IOException =>
- // Another process using `FileStreamSink` may delete the batch files when
- // `StreamFileCatalog` are reading. However, it only happens when a compaction is
- // deleting old files. If so, let's try the next compaction batch and we should find it.
- // Otherwise, this is a real IO issue and we should throw it.
- latestId = nextCompactionBatchId(latestId, compactInterval)
- get(latestId).getOrElse {
- throw e
- }
- }
- } else {
- return Array.empty
- }
- }
- Array.empty
- }
-
- /**
- * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the
- * corresponding `batchId` file. It will delete expired files as well if enabled.
- */
- private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
- val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
- val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
- if (super.add(batchId, compactLogs(allLogs).toArray)) {
- if (isDeletingExpiredLog) {
- deleteExpiredLog(batchId)
- }
- true
+ override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
+ val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
+ if (deletedFiles.isEmpty) {
+ logs
} else {
- // Return false as there is another writer.
- false
- }
- }
-
- /**
- * Since all logs before `compactionBatchId` are compacted and written into the
- * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of
- * S3, the compaction file may not be seen by other processes at once. So we only delete files
- * created `fileCleanupDelayMs` milliseconds ago.
- */
- private def deleteExpiredLog(compactionBatchId: Long): Unit = {
- val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
- fileManager.list(metadataPath, new PathFilter {
- override def accept(path: Path): Boolean = {
- try {
- val batchId = getBatchIdFromFileName(path.getName)
- batchId < compactionBatchId
- } catch {
- case _: NumberFormatException =>
- false
- }
- }
- }).foreach { f =>
- if (f.getModificationTime <= expiredTime) {
- fileManager.delete(f.getPath)
- }
+ logs.filter(f => !deletedFiles.contains(f.path))
}
}
}
object FileStreamSinkLog {
val VERSION = "v1"
- val COMPACT_FILE_SUFFIX = ".compact"
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
-
- def getBatchIdFromFileName(fileName: String): Long = {
- fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong
- }
-
- /**
- * Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every
- * `compactInterval` commits.
- *
- * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches.
- */
- def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = {
- (batchId + 1) % compactInterval == 0
- }
-
- /**
- * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
- * need to do a new compaction.
- *
- * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
- * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
- */
- def getValidBatchesBeforeCompactionBatch(
- compactionBatchId: Long,
- compactInterval: Int): Seq[Long] = {
- assert(isCompactionBatch(compactionBatchId, compactInterval),
- s"$compactionBatchId is not a compaction batch")
- (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
- }
-
- /**
- * Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just
- * return itself. Otherwise, it will find the previous compaction batch and return all batches
- * between it and `batchId`.
- */
- def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = {
- assert(batchId >= 0)
- val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1)
- start to batchId
- }
-
- /**
- * Removes all deleted files from logs. It assumes once one file is deleted, it won't be added to
- * the log in future.
- */
- def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
- val deletedFiles = logs.filter(_.action == DELETE_ACTION).map(_.path).toSet
- if (deletedFiles.isEmpty) {
- logs
- } else {
- logs.filter(f => !deletedFiles.contains(f.path))
- }
- }
-
- /**
- * Returns the next compaction batch id after `batchId`.
- */
- def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = {
- (batchId + compactInterval + 1) / compactInterval * compactInterval - 1
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 42fb454c2d..0dc08b1467 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -29,8 +29,6 @@ import org.apache.spark.sql.types.StructType
/**
* A very simple source that reads files from the given directory as they appear.
- *
- * TODO: Clean up the metadata log files periodically.
*/
class FileStreamSource(
sparkSession: SparkSession,
@@ -49,8 +47,8 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}
- private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)
-
+ private val metadataLog =
+ new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
/** Maximum number of new files to be considered in each batch */
@@ -60,11 +58,10 @@ class FileStreamSource(
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
- metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) =>
- entry.foreach(seenFiles.add)
- // TODO: move purge call out of the loop once we truncate logs.
- seenFiles.purge()
+ metadataLog.allFiles().foreach { entry =>
+ seenFiles.add(entry)
}
+ seenFiles.purge()
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}")
@@ -98,7 +95,7 @@ class FileStreamSource(
if (batchFiles.nonEmpty) {
maxBatchId += 1
- metadataLog.add(maxBatchId, batchFiles.toArray)
+ metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}
@@ -174,7 +171,10 @@ object FileStreamSource {
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long
- case class FileEntry(path: String, timestamp: Timestamp) extends Serializable
+ val NOT_SET = -1L
+
+ case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET)
+ extends Serializable
/**
* A custom hash map used to track the list of files seen. This map is not thread-safe.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
new file mode 100644
index 0000000000..8103309aff
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.{LinkedHashMap => JLinkedHashMap}
+import java.util.Map.Entry
+
+import scala.collection.mutable
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
+import org.apache.spark.sql.internal.SQLConf
+
+class FileStreamSourceLog(
+ metadataLogVersion: String,
+ sparkSession: SparkSession,
+ path: String)
+ extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {
+
+ import CompactibleFileStreamLog._
+
+ // Configurations about metadata compaction
+ protected override val compactInterval =
+ sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+ require(compactInterval > 0,
+ s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " +
+ s"positive value.")
+
+ protected override val fileCleanupDelayMs =
+ sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+
+ protected override val isDeletingExpiredLog =
+ sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+
+ private implicit val formats = Serialization.formats(NoTypeHints)
+
+ // A fixed size log entry cache to cache the file entries belong to the compaction batch. It is
+ // used to avoid scanning the compacted log file to retrieve it's own batch data.
+ private val cacheSize = compactInterval
+ private val fileEntryCache = new JLinkedHashMap[Long, Array[FileEntry]] {
+ override def removeEldestEntry(eldest: Entry[Long, Array[FileEntry]]): Boolean = {
+ size() > cacheSize
+ }
+ }
+
+ protected override def serializeData(data: FileEntry): String = {
+ Serialization.write(data)
+ }
+
+ protected override def deserializeData(encodedString: String): FileEntry = {
+ Serialization.read[FileEntry](encodedString)
+ }
+
+ def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
+ logs
+ }
+
+ override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
+ if (super.add(batchId, logs)) {
+ if (isCompactionBatch(batchId, compactInterval)) {
+ fileEntryCache.put(batchId, logs)
+ }
+ true
+ } else {
+ false
+ }
+ }
+
+ override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = {
+ val startBatchId = startId.getOrElse(0L)
+ val endBatchId = getLatest().map(_._1).getOrElse(0L)
+
+ val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id =>
+ if (isCompactionBatch(id, compactInterval) && fileEntryCache.containsKey(id)) {
+ (id, Some(fileEntryCache.get(id)))
+ } else {
+ val logs = super.get(id).map(_.filter(_.batchId == id))
+ (id, logs)
+ }
+ }.partition(_._2.isDefined)
+
+ // The below code may only be happened when original metadata log file has been removed, so we
+ // have to get the batch from latest compacted log file. This is quite time-consuming and may
+ // not be happened in the current FileStreamSource code path, since we only fetch the
+ // latest metadata log file.
+ val searchKeys = removedBatches.map(_._1)
+ val retrievedBatches = if (searchKeys.nonEmpty) {
+ logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!")
+ val latestBatchId = getLatest().map(_._1).getOrElse(-1L)
+ if (latestBatchId < 0) {
+ Map.empty[Long, Option[Array[FileEntry]]]
+ } else {
+ val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0)
+ val allLogs = new mutable.HashMap[Long, mutable.ArrayBuffer[FileEntry]]
+
+ super.get(latestCompactedBatchId).foreach { entries =>
+ entries.foreach { e =>
+ allLogs.put(e.batchId, allLogs.getOrElse(e.batchId, mutable.ArrayBuffer()) += e)
+ }
+ }
+
+ searchKeys.map(id => id -> allLogs.get(id).map(_.toArray)).filter(_._2.isDefined).toMap
+ }
+ } else {
+ Map.empty[Long, Option[Array[FileEntry]]]
+ }
+
+ (existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1)
+ }
+}
+
+object FileStreamSourceLog {
+ val VERSION = "v1"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
index 20ade12e37..a32c4671e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
@@ -34,7 +34,8 @@ class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path)
private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
- private val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString)
+ private val metadataLog =
+ new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString)
private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
private var cachedPartitionSpec: PartitionSpec = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 428032b1fb..f8b7a7f8ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -544,7 +544,28 @@ object SQLConf {
.internal()
.doc("How long that a file is guaranteed to be visible for all readers.")
.timeConf(TimeUnit.MILLISECONDS)
- .createWithDefault(60 * 1000L) // 10 minutes
+ .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
+
+ val FILE_SOURCE_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion")
+ .internal()
+ .doc("Whether to delete the expired log files in file stream source.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val FILE_SOURCE_LOG_COMPACT_INTERVAL =
+ SQLConfigBuilder("spark.sql.streaming.fileSource.log.compactInterval")
+ .internal()
+ .doc("Number of log files after which all the previous files " +
+ "are compacted into the next log file.")
+ .intConf
+ .createWithDefault(10)
+
+ val FILE_SOURCE_LOG_CLEANUP_DELAY =
+ SQLConfigBuilder("spark.sql.streaming.fileSource.log.cleanupDelay")
+ .internal()
+ .doc("How long in milliseconds a file is guaranteed to be visible for all readers.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
val STREAMING_SCHEMA_INFERENCE =
SQLConfigBuilder("spark.sql.streaming.schemaInference")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 26f8b98cb3..41a8cc2400 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -25,13 +25,14 @@ import org.apache.spark.sql.test.SharedSQLContext
class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
+ import CompactibleFileStreamLog._
import FileStreamSinkLog._
test("getBatchIdFromFileName") {
assert(1234L === getBatchIdFromFileName("1234"))
assert(1234L === getBatchIdFromFileName("1234.compact"))
intercept[NumberFormatException] {
- FileStreamSinkLog.getBatchIdFromFileName("1234a")
+ getBatchIdFromFileName("1234a")
}
}
@@ -83,17 +84,19 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
test("compactLogs") {
- val logs = Seq(
- newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION),
- newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION),
- newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION))
- assert(logs === compactLogs(logs))
+ withFileStreamSinkLog { sinkLog =>
+ val logs = Seq(
+ newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION))
+ assert(logs === sinkLog.compactLogs(logs))
- val logs2 = Seq(
- newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION),
- newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION),
- newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION))
- assert(logs.dropRight(1) ++ logs2.dropRight(1) === compactLogs(logs ++ logs2))
+ val logs2 = Seq(
+ newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION))
+ assert(logs.dropRight(1) ++ logs2.dropRight(1) === sinkLog.compactLogs(logs ++ logs2))
+ }
}
test("serialize") {
@@ -125,21 +128,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
action = FileStreamSinkLog.ADD_ACTION))
// scalastyle:off
- val expected = s"""${FileStreamSinkLog.VERSION}
+ val expected = s"""$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
// scalastyle:on
assert(expected === new String(sinkLog.serialize(logs), UTF_8))
- assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Array()), UTF_8))
+ assert(VERSION === new String(sinkLog.serialize(Array()), UTF_8))
}
}
test("deserialize") {
withFileStreamSinkLog { sinkLog =>
// scalastyle:off
- val logs = s"""${FileStreamSinkLog.VERSION}
+ val logs = s"""$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
@@ -173,7 +176,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8)))
- assert(Nil === sinkLog.deserialize(FileStreamSinkLog.VERSION.getBytes(UTF_8)))
+ assert(Nil === sinkLog.deserialize(VERSION.getBytes(UTF_8)))
}
}
@@ -263,7 +266,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = {
withTempDir { file =>
- val sinkLog = new FileStreamSinkLog(spark, file.getCanonicalPath)
+ val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath)
f(sinkLog)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index a02a36c004..55c95ae285 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
import java.io.File
-import org.scalatest.concurrent.Eventually._
+import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
@@ -30,7 +30,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-class FileStreamSourceTest extends StreamTest with SharedSQLContext {
+class FileStreamSourceTest extends StreamTest with SharedSQLContext with PrivateMethodTester {
import testImplicits._
@@ -804,6 +804,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
)
}
}
+
+ test("compacat metadata log") {
+ val _sources = PrivateMethod[Seq[Source]]('sources)
+ val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog)
+
+ def verify(execution: StreamExecution)
+ (batchId: Long, expectedBatches: Int): Boolean = {
+ import CompactibleFileStreamLog._
+
+ val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
+ val metadataLog = fileSource invokePrivate _metadataLog()
+
+ if (isCompactionBatch(batchId, 2)) {
+ val path = metadataLog.batchIdToPath(batchId)
+
+ // Assert path name should be ended with compact suffix.
+ assert(path.getName.endsWith(COMPACT_FILE_SUFFIX))
+
+ // Compacted batch should include all entries from start.
+ val entries = metadataLog.get(batchId)
+ assert(entries.isDefined)
+ assert(entries.get.length === metadataLog.allFiles().length)
+ assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === entries.get.length)
+ }
+
+ assert(metadataLog.allFiles().sortBy(_.batchId) ===
+ metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId))
+
+ metadataLog.get(None, Some(batchId)).flatMap(_._2).length === expectedBatches
+ }
+
+ withTempDirs { case (src, tmp) =>
+ withSQLConf(
+ SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2"
+ ) {
+ val fileStream = createFileStream("text", src.getCanonicalPath)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+ CheckAnswer("keep2", "keep3"),
+ AssertOnQuery(verify(_)(0L, 1)),
+ AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AssertOnQuery(verify(_)(1L, 2)),
+ AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"),
+ AssertOnQuery(verify(_)(2L, 3)),
+ StopStream,
+ StartStream(),
+ AssertOnQuery(verify(_)(2L, 3)),
+ AddTextFileData("drop10\nkeep11", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"),
+ AssertOnQuery(verify(_)(3L, 4)),
+ AddTextFileData("drop12\nkeep13", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11", "keep13"),
+ AssertOnQuery(verify(_)(4L, 5))
+ )
+ }
+ }
+ }
+
+ test("get arbitrary batch from FileStreamSource") {
+ withTempDirs { case (src, tmp) =>
+ withSQLConf(
+ SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+ // Force deleting the old logs
+ SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+ ) {
+ val fileStream = createFileStream("text", src.getCanonicalPath)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData("keep1", src, tmp),
+ CheckAnswer("keep1"),
+ AddTextFileData("keep2", src, tmp),
+ CheckAnswer("keep1", "keep2"),
+ AddTextFileData("keep3", src, tmp),
+ CheckAnswer("keep1", "keep2", "keep3"),
+ AssertOnQuery("check getBatch") { execution: StreamExecution =>
+ val _sources = PrivateMethod[Seq[Source]]('sources)
+ val fileSource =
+ (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
+ assert(fileSource.getBatch(None, LongOffset(2)).as[String].collect() ===
+ List("keep1", "keep2", "keep3"))
+ assert(fileSource.getBatch(Some(LongOffset(0)), LongOffset(2)).as[String].collect() ===
+ List("keep2", "keep3"))
+ assert(fileSource.getBatch(Some(LongOffset(1)), LongOffset(2)).as[String].collect() ===
+ List("keep3"))
+ true
+ }
+ )
+ }
+ }
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {