diff options
Diffstat (limited to 'sql')
6 files changed, 616 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index f3c1cc5ef5..4f722a514b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -44,28 +44,33 @@ class FileStreamSink( private val basePath = new Path(path) private val logPath = new Path(basePath, FileStreamSink.metadataDir) - private val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString) + private val fileLog = new FileStreamSinkLog(sqlContext, logPath.toUri.toString) + private val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) override def addBatch(batchId: Long, data: DataFrame): Unit = { - if (fileLog.get(batchId).isDefined) { + if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { logInfo(s"Skipping already committed batch $batchId") } else { - val files = writeFiles(data) + val files = fs.listStatus(writeFiles(data)).map { f => + SinkFileStatus( + path = f.getPath.toUri.toString, + size = f.getLen, + isDir = f.isDirectory, + modificationTime = f.getModificationTime, + blockReplication = f.getReplication, + blockSize = f.getBlockSize, + action = FileStreamSinkLog.ADD_ACTION) + } if (fileLog.add(batchId, files)) { logInfo(s"Committed batch $batchId") } else { - logWarning(s"Race while writing batch $batchId") + throw new IllegalStateException(s"Race while writing batch $batchId") } } } /** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */ - private def writeFiles(data: DataFrame): Seq[String] = { - val ctx = sqlContext - val outputDir = path - val format = fileFormat - val schema = data.schema - + private def writeFiles(data: DataFrame): Array[Path] = { val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString data.write.parquet(file) sqlContext.read @@ -74,7 +79,6 @@ class FileStreamSink( .inputFiles .map(new Path(_)) .filterNot(_.getName.startsWith("_")) - .map(_.toUri.toString) } override def toString: String = s"FileSink[$path]" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala new file mode 100644 index 0000000000..6c5449a928 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.IOException +import java.nio.charset.StandardCharsets.UTF_8 + +import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization +import org.json4s.jackson.Serialization.{read, write} + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.internal.SQLConf + +/** + * The status of a file outputted by [[FileStreamSink]]. A file is visible only if it appears in + * the sink log and its action is not "delete". + * + * @param path the file path. + * @param size the file size. + * @param isDir whether this file is a directory. + * @param modificationTime the file last modification time. + * @param blockReplication the block replication. + * @param blockSize the block size. + * @param action the file action. Must be either "add" or "delete". + */ +case class SinkFileStatus( + path: String, + size: Long, + isDir: Boolean, + modificationTime: Long, + blockReplication: Int, + blockSize: Long, + action: String) { + + def toFileStatus: FileStatus = { + new FileStatus(size, isDir, blockReplication, blockSize, modificationTime, new Path(path)) + } +} + +/** + * A special log for [[FileStreamSink]]. It will write one log file for each batch. The first line + * of the log file is the version number, and there are multiple JSON lines following. Each JSON + * line is a JSON format of [[SinkFileStatus]]. + * + * As reading from many small files is usually pretty slow, [[FileStreamSinkLog]] will compact log + * files every "spark.sql.sink.file.log.compactLen" batches into a big file. When doing a + * compaction, it will read all old log files and merge them with the new batch. During the + * compaction, it will also delete the files that are deleted (marked by [[SinkFileStatus.action]]). + * When the reader uses `allFiles` to list all files, this method only returns the visible files + * (drops the deleted files). + */ +class FileStreamSinkLog(sqlContext: SQLContext, path: String) + extends HDFSMetadataLog[Seq[SinkFileStatus]](sqlContext, path) { + + import FileStreamSinkLog._ + + private implicit val formats = Serialization.formats(NoTypeHints) + + /** + * If we delete the old files after compaction at once, there is a race condition in S3: other + * processes may see the old files are deleted but still cannot see the compaction file using + * "list". The `allFiles` handles this by looking for the next compaction file directly, however, + * a live lock may happen if the compaction happens too frequently: one processing keeps deleting + * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. + */ + private val fileCleanupDelayMs = sqlContext.getConf(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY) + + private val isDeletingExpiredLog = sqlContext.getConf(SQLConf.FILE_SINK_LOG_DELETION) + + private val compactInterval = sqlContext.getConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, + s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + + "to a positive value.") + + override def batchIdToPath(batchId: Long): Path = { + if (isCompactionBatch(batchId, compactInterval)) { + new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX") + } else { + new Path(metadataPath, batchId.toString) + } + } + + override def pathToBatchId(path: Path): Long = { + getBatchIdFromFileName(path.getName) + } + + override def isBatchFile(path: Path): Boolean = { + try { + getBatchIdFromFileName(path.getName) + true + } catch { + case _: NumberFormatException => false + } + } + + override def serialize(logData: Seq[SinkFileStatus]): Array[Byte] = { + (VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8) + } + + override def deserialize(bytes: Array[Byte]): Seq[SinkFileStatus] = { + val lines = new String(bytes, UTF_8).split("\n") + if (lines.length == 0) { + throw new IllegalStateException("Incomplete log file") + } + val version = lines(0) + if (version != VERSION) { + throw new IllegalStateException(s"Unknown log version: ${version}") + } + lines.toSeq.slice(1, lines.length).map(read[SinkFileStatus](_)) + } + + override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = { + if (isCompactionBatch(batchId, compactInterval)) { + compact(batchId, logs) + } else { + super.add(batchId, logs) + } + } + + /** + * Returns all files except the deleted ones. + */ + def allFiles(): Array[SinkFileStatus] = { + var latestId = getLatest().map(_._1).getOrElse(-1L) + // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog` + // is calling this method. This loop will retry the reading to deal with the + // race condition. + while (true) { + if (latestId >= 0) { + val startId = getAllValidBatches(latestId, compactInterval)(0) + try { + val logs = get(Some(startId), Some(latestId)).flatMap(_._2) + return compactLogs(logs).toArray + } catch { + case e: IOException => + // Another process using `FileStreamSink` may delete the batch files when + // `StreamFileCatalog` are reading. However, it only happens when a compaction is + // deleting old files. If so, let's try the next compaction batch and we should find it. + // Otherwise, this is a real IO issue and we should throw it. + latestId = nextCompactionBatchId(latestId, compactInterval) + get(latestId).getOrElse { + throw e + } + } + } else { + return Array.empty + } + } + Array.empty + } + + /** + * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the + * corresponding `batchId` file. It will delete expired files as well if enabled. + */ + private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = { + val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) + val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs + if (super.add(batchId, compactLogs(allLogs))) { + if (isDeletingExpiredLog) { + deleteExpiredLog(batchId) + } + true + } else { + // Return false as there is another writer. + false + } + } + + /** + * Since all logs before `compactionBatchId` are compacted and written into the + * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of + * S3, the compaction file may not be seen by other processes at once. So we only delete files + * created `fileCleanupDelayMs` milliseconds ago. + */ + private def deleteExpiredLog(compactionBatchId: Long): Unit = { + val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs + fileManager.list(metadataPath, new PathFilter { + override def accept(path: Path): Boolean = { + try { + val batchId = getBatchIdFromFileName(path.getName) + batchId < compactionBatchId + } catch { + case _: NumberFormatException => + false + } + } + }).foreach { f => + if (f.getModificationTime <= expiredTime) { + fileManager.delete(f.getPath) + } + } + } +} + +object FileStreamSinkLog { + val VERSION = "v1" + val COMPACT_FILE_SUFFIX = ".compact" + val DELETE_ACTION = "delete" + val ADD_ACTION = "add" + + def getBatchIdFromFileName(fileName: String): Long = { + fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong + } + + /** + * Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every + * `compactInterval` commits. + * + * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches. + */ + def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = { + (batchId + 1) % compactInterval == 0 + } + + /** + * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we + * need to do a new compaction. + * + * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns + * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2). + */ + def getValidBatchesBeforeCompactionBatch( + compactionBatchId: Long, + compactInterval: Int): Seq[Long] = { + assert(isCompactionBatch(compactionBatchId, compactInterval), + s"$compactionBatchId is not a compaction batch") + (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId + } + + /** + * Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just + * return itself. Otherwise, it will find the previous compaction batch and return all batches + * between it and `batchId`. + */ + def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = { + assert(batchId >= 0) + val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1) + start to batchId + } + + /** + * Removes all deleted files from logs. It assumes once one file is deleted, it won't be added to + * the log in future. + */ + def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { + val deletedFiles = logs.filter(_.action == DELETE_ACTION).map(_.path).toSet + if (deletedFiles.isEmpty) { + logs + } else { + logs.filter(f => !deletedFiles.contains(f.path)) + } + } + + /** + * Returns the next compaction batch id after `batchId`. + */ + def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = { + (batchId + compactInterval + 1) / compactInterval * compactInterval - 1 + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 9663fee18d..b52f7a28b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -51,8 +51,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) import HDFSMetadataLog._ - private val metadataPath = new Path(path) - private val fileManager = createFileManager() + val metadataPath = new Path(path) + protected val fileManager = createFileManager() if (!fileManager.exists(metadataPath)) { fileManager.mkdirs(metadataPath) @@ -62,7 +62,21 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) * A `PathFilter` to filter only batch files */ private val batchFilesFilter = new PathFilter { - override def accept(path: Path): Boolean = try { + override def accept(path: Path): Boolean = isBatchFile(path) + } + + private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance() + + protected def batchIdToPath(batchId: Long): Path = { + new Path(metadataPath, batchId.toString) + } + + protected def pathToBatchId(path: Path) = { + path.getName.toLong + } + + protected def isBatchFile(path: Path) = { + try { path.getName.toLong true } catch { @@ -70,18 +84,19 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) } } - private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance() + protected def serialize(metadata: T): Array[Byte] = { + JavaUtils.bufferToArray(serializer.serialize(metadata)) + } - private def batchFile(batchId: Long): Path = { - new Path(metadataPath, batchId.toString) + protected def deserialize(bytes: Array[Byte]): T = { + serializer.deserialize[T](ByteBuffer.wrap(bytes)) } override def add(batchId: Long, metadata: T): Boolean = { get(batchId).map(_ => false).getOrElse { // Only write metadata when the batch has not yet been written. - val buffer = serializer.serialize(metadata) try { - writeBatch(batchId, JavaUtils.bufferToArray(buffer)) + writeBatch(batchId, serialize(metadata)) true } catch { case e: IOException if "java.lang.InterruptedException" == e.getMessage => @@ -113,8 +128,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) try { // Try to commit the batch // It will fail if there is an existing file (someone has committed the batch) - logDebug(s"Attempting to write log #${batchFile(batchId)}") - fileManager.rename(tempPath, batchFile(batchId)) + logDebug(s"Attempting to write log #${batchIdToPath(batchId)}") + fileManager.rename(tempPath, batchIdToPath(batchId)) return } catch { case e: IOException if isFileAlreadyExistsException(e) => @@ -158,11 +173,11 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) } override def get(batchId: Long): Option[T] = { - val batchMetadataFile = batchFile(batchId) + val batchMetadataFile = batchIdToPath(batchId) if (fileManager.exists(batchMetadataFile)) { val input = fileManager.open(batchMetadataFile) val bytes = IOUtils.toByteArray(input) - Some(serializer.deserialize[T](ByteBuffer.wrap(bytes))) + Some(deserialize(bytes)) } else { logDebug(s"Unable to find batch $batchMetadataFile") None @@ -172,7 +187,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = { val files = fileManager.list(metadataPath, batchFilesFilter) val batchIds = files - .map(_.getPath.getName.toLong) + .map(f => pathToBatchId(f.getPath)) .filter { batchId => (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get) } @@ -184,7 +199,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) override def getLatest(): Option[(Long, T)] = { val batchIds = fileManager.list(metadataPath, batchFilesFilter) - .map(_.getPath.getName.toLong) + .map(f => pathToBatchId(f.getPath)) .sorted .reverse for (batchId <- batchIds) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala index b1f93a9159..95b5129351 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.types.StructType class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging { val metadataDirectory = new Path(path, FileStreamSink.metadataDir) logInfo(s"Reading streaming file log from $metadataDirectory") - val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataDirectory.toUri.toString) + val metadataLog = new FileStreamSinkLog(sqlContext, metadataDirectory.toUri.toString) val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) override def paths: Seq[Path] = path :: Nil @@ -53,6 +53,6 @@ class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog override def refresh(): Unit = {} override def allFiles(): Seq[FileStatus] = { - fs.listStatus(metadataLog.get(None, None).flatMap(_._2).map(new Path(_))) + metadataLog.allFiles().map(_.toFileStatus) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4ae8278a9d..80e2c1986d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.internal import java.util.{NoSuchElementException, Properties} +import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.immutable @@ -463,6 +464,27 @@ object SQLConf { .booleanConf .createWithDefault(false) + val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") + .internal() + .doc("Whether to delete the expired log files in file stream sink.") + .booleanConf + .createWithDefault(true) + + val FILE_SINK_LOG_COMPACT_INTERVAL = + SQLConfigBuilder("spark.sql.streaming.fileSink.log.compactInterval") + .internal() + .doc("Number of log files after which all the previous files " + + "are compacted into the next log file.") + .intConf + .createWithDefault(10) + + val FILE_SINK_LOG_CLEANUP_DELAY = + SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay") + .internal() + .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(60 * 1000L) // 10 minutes + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val EXTERNAL_SORT = "spark.sql.planner.externalSort" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala new file mode 100644 index 0000000000..70c2a82990 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.nio.charset.StandardCharsets.UTF_8 + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { + + import FileStreamSinkLog._ + + test("getBatchIdFromFileName") { + assert(1234L === getBatchIdFromFileName("1234")) + assert(1234L === getBatchIdFromFileName("1234.compact")) + intercept[NumberFormatException] { + FileStreamSinkLog.getBatchIdFromFileName("1234a") + } + } + + test("isCompactionBatch") { + assert(false === isCompactionBatch(0, compactInterval = 3)) + assert(false === isCompactionBatch(1, compactInterval = 3)) + assert(true === isCompactionBatch(2, compactInterval = 3)) + assert(false === isCompactionBatch(3, compactInterval = 3)) + assert(false === isCompactionBatch(4, compactInterval = 3)) + assert(true === isCompactionBatch(5, compactInterval = 3)) + } + + test("nextCompactionBatchId") { + assert(2 === nextCompactionBatchId(0, compactInterval = 3)) + assert(2 === nextCompactionBatchId(1, compactInterval = 3)) + assert(5 === nextCompactionBatchId(2, compactInterval = 3)) + assert(5 === nextCompactionBatchId(3, compactInterval = 3)) + assert(5 === nextCompactionBatchId(4, compactInterval = 3)) + assert(8 === nextCompactionBatchId(5, compactInterval = 3)) + } + + test("getValidBatchesBeforeCompactionBatch") { + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(0, compactInterval = 3) + } + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(1, compactInterval = 3) + } + assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3)) + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(3, compactInterval = 3) + } + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(4, compactInterval = 3) + } + assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3)) + } + + test("getAllValidBatches") { + assert(Seq(0) === getAllValidBatches(0, compactInterval = 3)) + assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3)) + assert(Seq(2) === getAllValidBatches(2, compactInterval = 3)) + assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3)) + assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3)) + assert(Seq(5) === getAllValidBatches(5, compactInterval = 3)) + assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3)) + assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3)) + assert(Seq(8) === getAllValidBatches(8, compactInterval = 3)) + } + + test("compactLogs") { + val logs = Seq( + newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION), + newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION), + newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION)) + assert(logs === compactLogs(logs)) + + val logs2 = Seq( + newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION), + newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION), + newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION)) + assert(logs.dropRight(1) ++ logs2.dropRight(1) === compactLogs(logs ++ logs2)) + } + + test("serialize") { + withFileStreamSinkLog { sinkLog => + val logs = Seq( + SinkFileStatus( + path = "/a/b/x", + size = 100L, + isDir = false, + modificationTime = 1000L, + blockReplication = 1, + blockSize = 10000L, + action = FileStreamSinkLog.ADD_ACTION), + SinkFileStatus( + path = "/a/b/y", + size = 200L, + isDir = false, + modificationTime = 2000L, + blockReplication = 2, + blockSize = 20000L, + action = FileStreamSinkLog.DELETE_ACTION), + SinkFileStatus( + path = "/a/b/z", + size = 300L, + isDir = false, + modificationTime = 3000L, + blockReplication = 3, + blockSize = 30000L, + action = FileStreamSinkLog.ADD_ACTION)) + + // scalastyle:off + val expected = s"""${FileStreamSinkLog.VERSION} + |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} + |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} + |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin + // scalastyle:on + assert(expected === new String(sinkLog.serialize(logs), UTF_8)) + + assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Nil), UTF_8)) + } + } + + test("deserialize") { + withFileStreamSinkLog { sinkLog => + // scalastyle:off + val logs = s"""${FileStreamSinkLog.VERSION} + |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} + |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} + |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin + // scalastyle:on + + val expected = Seq( + SinkFileStatus( + path = "/a/b/x", + size = 100L, + isDir = false, + modificationTime = 1000L, + blockReplication = 1, + blockSize = 10000L, + action = FileStreamSinkLog.ADD_ACTION), + SinkFileStatus( + path = "/a/b/y", + size = 200L, + isDir = false, + modificationTime = 2000L, + blockReplication = 2, + blockSize = 20000L, + action = FileStreamSinkLog.DELETE_ACTION), + SinkFileStatus( + path = "/a/b/z", + size = 300L, + isDir = false, + modificationTime = 3000L, + blockReplication = 3, + blockSize = 30000L, + action = FileStreamSinkLog.ADD_ACTION)) + + assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8))) + + assert(Nil === sinkLog.deserialize(FileStreamSinkLog.VERSION.getBytes(UTF_8))) + } + } + + test("batchIdToPath") { + withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withFileStreamSinkLog { sinkLog => + assert("0" === sinkLog.batchIdToPath(0).getName) + assert("1" === sinkLog.batchIdToPath(1).getName) + assert("2.compact" === sinkLog.batchIdToPath(2).getName) + assert("3" === sinkLog.batchIdToPath(3).getName) + assert("4" === sinkLog.batchIdToPath(4).getName) + assert("5.compact" === sinkLog.batchIdToPath(5).getName) + } + } + } + + test("compact") { + withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withFileStreamSinkLog { sinkLog => + for (batchId <- 0 to 10) { + sinkLog.add( + batchId, + Seq(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION))) + val expectedFiles = (0 to batchId).map { + id => newFakeSinkFileStatus("/a/b/" + id, FileStreamSinkLog.ADD_ACTION) + } + assert(sinkLog.allFiles() === expectedFiles) + if (isCompactionBatch(batchId, 3)) { + // Since batchId is a compaction batch, the batch log file should contain all logs + assert(sinkLog.get(batchId).getOrElse(Nil) === expectedFiles) + } + } + } + } + } + + test("delete expired file") { + // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour + // deterministically + withSQLConf( + SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3", + SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") { + withFileStreamSinkLog { sinkLog => + val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + + def listBatchFiles(): Set[String] = { + fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => + try { + getBatchIdFromFileName(fileName) + true + } catch { + case _: NumberFormatException => false + } + }.toSet + } + + sinkLog.add(0, Seq(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION))) + assert(Set("0") === listBatchFiles()) + sinkLog.add(1, Seq(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION))) + assert(Set("0", "1") === listBatchFiles()) + sinkLog.add(2, Seq(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION))) + assert(Set("2.compact") === listBatchFiles()) + sinkLog.add(3, Seq(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION))) + assert(Set("2.compact", "3") === listBatchFiles()) + sinkLog.add(4, Seq(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION))) + assert(Set("2.compact", "3", "4") === listBatchFiles()) + sinkLog.add(5, Seq(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION))) + assert(Set("5.compact") === listBatchFiles()) + } + } + } + + /** + * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields + * in SinkFileStatus. + */ + private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = { + SinkFileStatus( + path = path, + size = 100L, + isDir = false, + modificationTime = 100L, + blockReplication = 1, + blockSize = 100L, + action = action) + } + + private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = { + withTempDir { file => + val sinkLog = new FileStreamSinkLog(sqlContext, file.getCanonicalPath) + f(sinkLog) + } + } +} |