diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala | 212 |
1 files changed, 21 insertions, 191 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 6f9f7c18c4..64f2f00484 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -17,10 +17,7 @@ package org.apache.spark.sql.execution.streaming -import java.io.IOException -import java.nio.charset.StandardCharsets.UTF_8 - -import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization.{read, write} @@ -79,213 +76,46 @@ object SinkFileStatus { * When the reader uses `allFiles` to list all files, this method only returns the visible files * (drops the deleted files). */ -class FileStreamSinkLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) { - - import FileStreamSinkLog._ +class FileStreamSinkLog( + metadataLogVersion: String, + sparkSession: SparkSession, + path: String) + extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) { private implicit val formats = Serialization.formats(NoTypeHints) - /** - * If we delete the old files after compaction at once, there is a race condition in S3: other - * processes may see the old files are deleted but still cannot see the compaction file using - * "list". The `allFiles` handles this by looking for the next compaction file directly, however, - * a live lock may happen if the compaction happens too frequently: one processing keeps deleting - * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. - */ - private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay + protected override val fileCleanupDelayMs = + sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY) - private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion + protected override val isDeletingExpiredLog = + sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION) - private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL) require(compactInterval > 0, s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + "to a positive value.") - override def batchIdToPath(batchId: Long): Path = { - if (isCompactionBatch(batchId, compactInterval)) { - new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX") - } else { - new Path(metadataPath, batchId.toString) - } - } - - override def pathToBatchId(path: Path): Long = { - getBatchIdFromFileName(path.getName) - } - - override def isBatchFile(path: Path): Boolean = { - try { - getBatchIdFromFileName(path.getName) - true - } catch { - case _: NumberFormatException => false - } - } - - override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = { - (VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8) + protected override def serializeData(data: SinkFileStatus): String = { + write(data) } - override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = { - val lines = new String(bytes, UTF_8).split("\n") - if (lines.length == 0) { - throw new IllegalStateException("Incomplete log file") - } - val version = lines(0) - if (version != VERSION) { - throw new IllegalStateException(s"Unknown log version: ${version}") - } - lines.slice(1, lines.length).map(read[SinkFileStatus](_)) - } - - override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = { - if (isCompactionBatch(batchId, compactInterval)) { - compact(batchId, logs) - } else { - super.add(batchId, logs) - } + protected override def deserializeData(encodedString: String): SinkFileStatus = { + read[SinkFileStatus](encodedString) } - /** - * Returns all files except the deleted ones. - */ - def allFiles(): Array[SinkFileStatus] = { - var latestId = getLatest().map(_._1).getOrElse(-1L) - // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog` - // is calling this method. This loop will retry the reading to deal with the - // race condition. - while (true) { - if (latestId >= 0) { - val startId = getAllValidBatches(latestId, compactInterval)(0) - try { - val logs = get(Some(startId), Some(latestId)).flatMap(_._2) - return compactLogs(logs).toArray - } catch { - case e: IOException => - // Another process using `FileStreamSink` may delete the batch files when - // `StreamFileCatalog` are reading. However, it only happens when a compaction is - // deleting old files. If so, let's try the next compaction batch and we should find it. - // Otherwise, this is a real IO issue and we should throw it. - latestId = nextCompactionBatchId(latestId, compactInterval) - get(latestId).getOrElse { - throw e - } - } - } else { - return Array.empty - } - } - Array.empty - } - - /** - * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the - * corresponding `batchId` file. It will delete expired files as well if enabled. - */ - private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = { - val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) - val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs - if (super.add(batchId, compactLogs(allLogs).toArray)) { - if (isDeletingExpiredLog) { - deleteExpiredLog(batchId) - } - true + override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { + val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet + if (deletedFiles.isEmpty) { + logs } else { - // Return false as there is another writer. - false - } - } - - /** - * Since all logs before `compactionBatchId` are compacted and written into the - * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of - * S3, the compaction file may not be seen by other processes at once. So we only delete files - * created `fileCleanupDelayMs` milliseconds ago. - */ - private def deleteExpiredLog(compactionBatchId: Long): Unit = { - val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs - fileManager.list(metadataPath, new PathFilter { - override def accept(path: Path): Boolean = { - try { - val batchId = getBatchIdFromFileName(path.getName) - batchId < compactionBatchId - } catch { - case _: NumberFormatException => - false - } - } - }).foreach { f => - if (f.getModificationTime <= expiredTime) { - fileManager.delete(f.getPath) - } + logs.filter(f => !deletedFiles.contains(f.path)) } } } object FileStreamSinkLog { val VERSION = "v1" - val COMPACT_FILE_SUFFIX = ".compact" val DELETE_ACTION = "delete" val ADD_ACTION = "add" - - def getBatchIdFromFileName(fileName: String): Long = { - fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong - } - - /** - * Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every - * `compactInterval` commits. - * - * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches. - */ - def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = { - (batchId + 1) % compactInterval == 0 - } - - /** - * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we - * need to do a new compaction. - * - * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns - * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2). - */ - def getValidBatchesBeforeCompactionBatch( - compactionBatchId: Long, - compactInterval: Int): Seq[Long] = { - assert(isCompactionBatch(compactionBatchId, compactInterval), - s"$compactionBatchId is not a compaction batch") - (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId - } - - /** - * Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just - * return itself. Otherwise, it will find the previous compaction batch and return all batches - * between it and `batchId`. - */ - def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = { - assert(batchId >= 0) - val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1) - start to batchId - } - - /** - * Removes all deleted files from logs. It assumes once one file is deleted, it won't be added to - * the log in future. - */ - def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { - val deletedFiles = logs.filter(_.action == DELETE_ACTION).map(_.path).toSet - if (deletedFiles.isEmpty) { - logs - } else { - logs.filter(f => !deletedFiles.contains(f.path)) - } - } - - /** - * Returns the next compaction batch id after `batchId`. - */ - def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = { - (batchId + compactInterval + 1) / compactInterval * compactInterval - 1 - } } |