aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala212
1 files changed, 21 insertions, 191 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 6f9f7c18c4..64f2f00484 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -17,10 +17,7 @@
package org.apache.spark.sql.execution.streaming
-import java.io.IOException
-import java.nio.charset.StandardCharsets.UTF_8
-
-import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
@@ -79,213 +76,46 @@ object SinkFileStatus {
* When the reader uses `allFiles` to list all files, this method only returns the visible files
* (drops the deleted files).
*/
-class FileStreamSinkLog(sparkSession: SparkSession, path: String)
- extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {
-
- import FileStreamSinkLog._
+class FileStreamSinkLog(
+ metadataLogVersion: String,
+ sparkSession: SparkSession,
+ path: String)
+ extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
private implicit val formats = Serialization.formats(NoTypeHints)
- /**
- * If we delete the old files after compaction at once, there is a race condition in S3: other
- * processes may see the old files are deleted but still cannot see the compaction file using
- * "list". The `allFiles` handles this by looking for the next compaction file directly, however,
- * a live lock may happen if the compaction happens too frequently: one processing keeps deleting
- * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
- */
- private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
+ protected override val fileCleanupDelayMs =
+ sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
- private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
+ protected override val isDeletingExpiredLog =
+ sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
- private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval
+ protected override val compactInterval =
+ sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")
- override def batchIdToPath(batchId: Long): Path = {
- if (isCompactionBatch(batchId, compactInterval)) {
- new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX")
- } else {
- new Path(metadataPath, batchId.toString)
- }
- }
-
- override def pathToBatchId(path: Path): Long = {
- getBatchIdFromFileName(path.getName)
- }
-
- override def isBatchFile(path: Path): Boolean = {
- try {
- getBatchIdFromFileName(path.getName)
- true
- } catch {
- case _: NumberFormatException => false
- }
- }
-
- override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = {
- (VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8)
+ protected override def serializeData(data: SinkFileStatus): String = {
+ write(data)
}
- override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = {
- val lines = new String(bytes, UTF_8).split("\n")
- if (lines.length == 0) {
- throw new IllegalStateException("Incomplete log file")
- }
- val version = lines(0)
- if (version != VERSION) {
- throw new IllegalStateException(s"Unknown log version: ${version}")
- }
- lines.slice(1, lines.length).map(read[SinkFileStatus](_))
- }
-
- override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = {
- if (isCompactionBatch(batchId, compactInterval)) {
- compact(batchId, logs)
- } else {
- super.add(batchId, logs)
- }
+ protected override def deserializeData(encodedString: String): SinkFileStatus = {
+ read[SinkFileStatus](encodedString)
}
- /**
- * Returns all files except the deleted ones.
- */
- def allFiles(): Array[SinkFileStatus] = {
- var latestId = getLatest().map(_._1).getOrElse(-1L)
- // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog`
- // is calling this method. This loop will retry the reading to deal with the
- // race condition.
- while (true) {
- if (latestId >= 0) {
- val startId = getAllValidBatches(latestId, compactInterval)(0)
- try {
- val logs = get(Some(startId), Some(latestId)).flatMap(_._2)
- return compactLogs(logs).toArray
- } catch {
- case e: IOException =>
- // Another process using `FileStreamSink` may delete the batch files when
- // `StreamFileCatalog` are reading. However, it only happens when a compaction is
- // deleting old files. If so, let's try the next compaction batch and we should find it.
- // Otherwise, this is a real IO issue and we should throw it.
- latestId = nextCompactionBatchId(latestId, compactInterval)
- get(latestId).getOrElse {
- throw e
- }
- }
- } else {
- return Array.empty
- }
- }
- Array.empty
- }
-
- /**
- * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the
- * corresponding `batchId` file. It will delete expired files as well if enabled.
- */
- private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
- val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
- val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
- if (super.add(batchId, compactLogs(allLogs).toArray)) {
- if (isDeletingExpiredLog) {
- deleteExpiredLog(batchId)
- }
- true
+ override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
+ val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
+ if (deletedFiles.isEmpty) {
+ logs
} else {
- // Return false as there is another writer.
- false
- }
- }
-
- /**
- * Since all logs before `compactionBatchId` are compacted and written into the
- * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of
- * S3, the compaction file may not be seen by other processes at once. So we only delete files
- * created `fileCleanupDelayMs` milliseconds ago.
- */
- private def deleteExpiredLog(compactionBatchId: Long): Unit = {
- val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
- fileManager.list(metadataPath, new PathFilter {
- override def accept(path: Path): Boolean = {
- try {
- val batchId = getBatchIdFromFileName(path.getName)
- batchId < compactionBatchId
- } catch {
- case _: NumberFormatException =>
- false
- }
- }
- }).foreach { f =>
- if (f.getModificationTime <= expiredTime) {
- fileManager.delete(f.getPath)
- }
+ logs.filter(f => !deletedFiles.contains(f.path))
}
}
}
object FileStreamSinkLog {
val VERSION = "v1"
- val COMPACT_FILE_SUFFIX = ".compact"
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
-
- def getBatchIdFromFileName(fileName: String): Long = {
- fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong
- }
-
- /**
- * Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every
- * `compactInterval` commits.
- *
- * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches.
- */
- def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = {
- (batchId + 1) % compactInterval == 0
- }
-
- /**
- * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
- * need to do a new compaction.
- *
- * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
- * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
- */
- def getValidBatchesBeforeCompactionBatch(
- compactionBatchId: Long,
- compactInterval: Int): Seq[Long] = {
- assert(isCompactionBatch(compactionBatchId, compactInterval),
- s"$compactionBatchId is not a compaction batch")
- (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
- }
-
- /**
- * Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just
- * return itself. Otherwise, it will find the previous compaction batch and return all batches
- * between it and `batchId`.
- */
- def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = {
- assert(batchId >= 0)
- val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1)
- start to batchId
- }
-
- /**
- * Removes all deleted files from logs. It assumes once one file is deleted, it won't be added to
- * the log in future.
- */
- def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
- val deletedFiles = logs.filter(_.action == DELETE_ACTION).map(_.path).toSet
- if (deletedFiles.isEmpty) {
- logs
- } else {
- logs.filter(f => !deletedFiles.contains(f.path))
- }
- }
-
- /**
- * Returns the next compaction batch id after `batchId`.
- */
- def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = {
- (batchId + compactInterval + 1) / compactInterval * compactInterval - 1
- }
}