aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala278
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala43
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala270
6 files changed, 616 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index f3c1cc5ef5..4f722a514b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -44,28 +44,33 @@ class FileStreamSink(
private val basePath = new Path(path)
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
- private val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString)
+ private val fileLog = new FileStreamSinkLog(sqlContext, logPath.toUri.toString)
+ private val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
- if (fileLog.get(batchId).isDefined) {
+ if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
} else {
- val files = writeFiles(data)
+ val files = fs.listStatus(writeFiles(data)).map { f =>
+ SinkFileStatus(
+ path = f.getPath.toUri.toString,
+ size = f.getLen,
+ isDir = f.isDirectory,
+ modificationTime = f.getModificationTime,
+ blockReplication = f.getReplication,
+ blockSize = f.getBlockSize,
+ action = FileStreamSinkLog.ADD_ACTION)
+ }
if (fileLog.add(batchId, files)) {
logInfo(s"Committed batch $batchId")
} else {
- logWarning(s"Race while writing batch $batchId")
+ throw new IllegalStateException(s"Race while writing batch $batchId")
}
}
}
/** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */
- private def writeFiles(data: DataFrame): Seq[String] = {
- val ctx = sqlContext
- val outputDir = path
- val format = fileFormat
- val schema = data.schema
-
+ private def writeFiles(data: DataFrame): Array[Path] = {
val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString
data.write.parquet(file)
sqlContext.read
@@ -74,7 +79,6 @@ class FileStreamSink(
.inputFiles
.map(new Path(_))
.filterNot(_.getName.startsWith("_"))
- .map(_.toUri.toString)
}
override def toString: String = s"FileSink[$path]"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
new file mode 100644
index 0000000000..6c5449a928
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.IOException
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+import org.json4s.jackson.Serialization.{read, write}
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * The status of a file outputted by [[FileStreamSink]]. A file is visible only if it appears in
+ * the sink log and its action is not "delete".
+ *
+ * @param path the file path.
+ * @param size the file size.
+ * @param isDir whether this file is a directory.
+ * @param modificationTime the file last modification time.
+ * @param blockReplication the block replication.
+ * @param blockSize the block size.
+ * @param action the file action. Must be either "add" or "delete".
+ */
+case class SinkFileStatus(
+ path: String,
+ size: Long,
+ isDir: Boolean,
+ modificationTime: Long,
+ blockReplication: Int,
+ blockSize: Long,
+ action: String) {
+
+ def toFileStatus: FileStatus = {
+ new FileStatus(size, isDir, blockReplication, blockSize, modificationTime, new Path(path))
+ }
+}
+
+/**
+ * A special log for [[FileStreamSink]]. It will write one log file for each batch. The first line
+ * of the log file is the version number, and there are multiple JSON lines following. Each JSON
+ * line is a JSON format of [[SinkFileStatus]].
+ *
+ * As reading from many small files is usually pretty slow, [[FileStreamSinkLog]] will compact log
+ * files every "spark.sql.sink.file.log.compactLen" batches into a big file. When doing a
+ * compaction, it will read all old log files and merge them with the new batch. During the
+ * compaction, it will also delete the files that are deleted (marked by [[SinkFileStatus.action]]).
+ * When the reader uses `allFiles` to list all files, this method only returns the visible files
+ * (drops the deleted files).
+ */
+class FileStreamSinkLog(sqlContext: SQLContext, path: String)
+ extends HDFSMetadataLog[Seq[SinkFileStatus]](sqlContext, path) {
+
+ import FileStreamSinkLog._
+
+ private implicit val formats = Serialization.formats(NoTypeHints)
+
+ /**
+ * If we delete the old files after compaction at once, there is a race condition in S3: other
+ * processes may see the old files are deleted but still cannot see the compaction file using
+ * "list". The `allFiles` handles this by looking for the next compaction file directly, however,
+ * a live lock may happen if the compaction happens too frequently: one processing keeps deleting
+ * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
+ */
+ private val fileCleanupDelayMs = sqlContext.getConf(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
+
+ private val isDeletingExpiredLog = sqlContext.getConf(SQLConf.FILE_SINK_LOG_DELETION)
+
+ private val compactInterval = sqlContext.getConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
+ require(compactInterval > 0,
+ s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
+ "to a positive value.")
+
+ override def batchIdToPath(batchId: Long): Path = {
+ if (isCompactionBatch(batchId, compactInterval)) {
+ new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX")
+ } else {
+ new Path(metadataPath, batchId.toString)
+ }
+ }
+
+ override def pathToBatchId(path: Path): Long = {
+ getBatchIdFromFileName(path.getName)
+ }
+
+ override def isBatchFile(path: Path): Boolean = {
+ try {
+ getBatchIdFromFileName(path.getName)
+ true
+ } catch {
+ case _: NumberFormatException => false
+ }
+ }
+
+ override def serialize(logData: Seq[SinkFileStatus]): Array[Byte] = {
+ (VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8)
+ }
+
+ override def deserialize(bytes: Array[Byte]): Seq[SinkFileStatus] = {
+ val lines = new String(bytes, UTF_8).split("\n")
+ if (lines.length == 0) {
+ throw new IllegalStateException("Incomplete log file")
+ }
+ val version = lines(0)
+ if (version != VERSION) {
+ throw new IllegalStateException(s"Unknown log version: ${version}")
+ }
+ lines.toSeq.slice(1, lines.length).map(read[SinkFileStatus](_))
+ }
+
+ override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
+ if (isCompactionBatch(batchId, compactInterval)) {
+ compact(batchId, logs)
+ } else {
+ super.add(batchId, logs)
+ }
+ }
+
+ /**
+ * Returns all files except the deleted ones.
+ */
+ def allFiles(): Array[SinkFileStatus] = {
+ var latestId = getLatest().map(_._1).getOrElse(-1L)
+ // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog`
+ // is calling this method. This loop will retry the reading to deal with the
+ // race condition.
+ while (true) {
+ if (latestId >= 0) {
+ val startId = getAllValidBatches(latestId, compactInterval)(0)
+ try {
+ val logs = get(Some(startId), Some(latestId)).flatMap(_._2)
+ return compactLogs(logs).toArray
+ } catch {
+ case e: IOException =>
+ // Another process using `FileStreamSink` may delete the batch files when
+ // `StreamFileCatalog` are reading. However, it only happens when a compaction is
+ // deleting old files. If so, let's try the next compaction batch and we should find it.
+ // Otherwise, this is a real IO issue and we should throw it.
+ latestId = nextCompactionBatchId(latestId, compactInterval)
+ get(latestId).getOrElse {
+ throw e
+ }
+ }
+ } else {
+ return Array.empty
+ }
+ }
+ Array.empty
+ }
+
+ /**
+ * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the
+ * corresponding `batchId` file. It will delete expired files as well if enabled.
+ */
+ private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
+ val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
+ val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
+ if (super.add(batchId, compactLogs(allLogs))) {
+ if (isDeletingExpiredLog) {
+ deleteExpiredLog(batchId)
+ }
+ true
+ } else {
+ // Return false as there is another writer.
+ false
+ }
+ }
+
+ /**
+ * Since all logs before `compactionBatchId` are compacted and written into the
+ * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of
+ * S3, the compaction file may not be seen by other processes at once. So we only delete files
+ * created `fileCleanupDelayMs` milliseconds ago.
+ */
+ private def deleteExpiredLog(compactionBatchId: Long): Unit = {
+ val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
+ fileManager.list(metadataPath, new PathFilter {
+ override def accept(path: Path): Boolean = {
+ try {
+ val batchId = getBatchIdFromFileName(path.getName)
+ batchId < compactionBatchId
+ } catch {
+ case _: NumberFormatException =>
+ false
+ }
+ }
+ }).foreach { f =>
+ if (f.getModificationTime <= expiredTime) {
+ fileManager.delete(f.getPath)
+ }
+ }
+ }
+}
+
+object FileStreamSinkLog {
+ val VERSION = "v1"
+ val COMPACT_FILE_SUFFIX = ".compact"
+ val DELETE_ACTION = "delete"
+ val ADD_ACTION = "add"
+
+ def getBatchIdFromFileName(fileName: String): Long = {
+ fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong
+ }
+
+ /**
+ * Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every
+ * `compactInterval` commits.
+ *
+ * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches.
+ */
+ def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = {
+ (batchId + 1) % compactInterval == 0
+ }
+
+ /**
+ * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
+ * need to do a new compaction.
+ *
+ * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
+ * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
+ */
+ def getValidBatchesBeforeCompactionBatch(
+ compactionBatchId: Long,
+ compactInterval: Int): Seq[Long] = {
+ assert(isCompactionBatch(compactionBatchId, compactInterval),
+ s"$compactionBatchId is not a compaction batch")
+ (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
+ }
+
+ /**
+ * Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just
+ * return itself. Otherwise, it will find the previous compaction batch and return all batches
+ * between it and `batchId`.
+ */
+ def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = {
+ assert(batchId >= 0)
+ val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1)
+ start to batchId
+ }
+
+ /**
+ * Removes all deleted files from logs. It assumes once one file is deleted, it won't be added to
+ * the log in future.
+ */
+ def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
+ val deletedFiles = logs.filter(_.action == DELETE_ACTION).map(_.path).toSet
+ if (deletedFiles.isEmpty) {
+ logs
+ } else {
+ logs.filter(f => !deletedFiles.contains(f.path))
+ }
+ }
+
+ /**
+ * Returns the next compaction batch id after `batchId`.
+ */
+ def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = {
+ (batchId + compactInterval + 1) / compactInterval * compactInterval - 1
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 9663fee18d..b52f7a28b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -51,8 +51,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
import HDFSMetadataLog._
- private val metadataPath = new Path(path)
- private val fileManager = createFileManager()
+ val metadataPath = new Path(path)
+ protected val fileManager = createFileManager()
if (!fileManager.exists(metadataPath)) {
fileManager.mkdirs(metadataPath)
@@ -62,7 +62,21 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
* A `PathFilter` to filter only batch files
*/
private val batchFilesFilter = new PathFilter {
- override def accept(path: Path): Boolean = try {
+ override def accept(path: Path): Boolean = isBatchFile(path)
+ }
+
+ private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance()
+
+ protected def batchIdToPath(batchId: Long): Path = {
+ new Path(metadataPath, batchId.toString)
+ }
+
+ protected def pathToBatchId(path: Path) = {
+ path.getName.toLong
+ }
+
+ protected def isBatchFile(path: Path) = {
+ try {
path.getName.toLong
true
} catch {
@@ -70,18 +84,19 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
}
}
- private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance()
+ protected def serialize(metadata: T): Array[Byte] = {
+ JavaUtils.bufferToArray(serializer.serialize(metadata))
+ }
- private def batchFile(batchId: Long): Path = {
- new Path(metadataPath, batchId.toString)
+ protected def deserialize(bytes: Array[Byte]): T = {
+ serializer.deserialize[T](ByteBuffer.wrap(bytes))
}
override def add(batchId: Long, metadata: T): Boolean = {
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written.
- val buffer = serializer.serialize(metadata)
try {
- writeBatch(batchId, JavaUtils.bufferToArray(buffer))
+ writeBatch(batchId, serialize(metadata))
true
} catch {
case e: IOException if "java.lang.InterruptedException" == e.getMessage =>
@@ -113,8 +128,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
try {
// Try to commit the batch
// It will fail if there is an existing file (someone has committed the batch)
- logDebug(s"Attempting to write log #${batchFile(batchId)}")
- fileManager.rename(tempPath, batchFile(batchId))
+ logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
+ fileManager.rename(tempPath, batchIdToPath(batchId))
return
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
@@ -158,11 +173,11 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
}
override def get(batchId: Long): Option[T] = {
- val batchMetadataFile = batchFile(batchId)
+ val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
val input = fileManager.open(batchMetadataFile)
val bytes = IOUtils.toByteArray(input)
- Some(serializer.deserialize[T](ByteBuffer.wrap(bytes)))
+ Some(deserialize(bytes))
} else {
logDebug(s"Unable to find batch $batchMetadataFile")
None
@@ -172,7 +187,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
val files = fileManager.list(metadataPath, batchFilesFilter)
val batchIds = files
- .map(_.getPath.getName.toLong)
+ .map(f => pathToBatchId(f.getPath))
.filter { batchId =>
(endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get)
}
@@ -184,7 +199,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
override def getLatest(): Option[(Long, T)] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
- .map(_.getPath.getName.toLong)
+ .map(f => pathToBatchId(f.getPath))
.sorted
.reverse
for (batchId <- batchIds) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
index b1f93a9159..95b5129351 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types.StructType
class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging {
val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
- val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataDirectory.toUri.toString)
+ val metadataLog = new FileStreamSinkLog(sqlContext, metadataDirectory.toUri.toString)
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
override def paths: Seq[Path] = path :: Nil
@@ -53,6 +53,6 @@ class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog
override def refresh(): Unit = {}
override def allFiles(): Seq[FileStatus] = {
- fs.listStatus(metadataLog.get(None, None).flatMap(_._2).map(new Path(_)))
+ metadataLog.allFiles().map(_.toFileStatus)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4ae8278a9d..80e2c1986d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.internal
import java.util.{NoSuchElementException, Properties}
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.immutable
@@ -463,6 +464,27 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
+ .internal()
+ .doc("Whether to delete the expired log files in file stream sink.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val FILE_SINK_LOG_COMPACT_INTERVAL =
+ SQLConfigBuilder("spark.sql.streaming.fileSink.log.compactInterval")
+ .internal()
+ .doc("Number of log files after which all the previous files " +
+ "are compacted into the next log file.")
+ .intConf
+ .createWithDefault(10)
+
+ val FILE_SINK_LOG_CLEANUP_DELAY =
+ SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay")
+ .internal()
+ .doc("How long in milliseconds a file is guaranteed to be visible for all readers.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(60 * 1000L) // 10 minutes
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
new file mode 100644
index 0000000000..70c2a82990
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
+
+ import FileStreamSinkLog._
+
+ test("getBatchIdFromFileName") {
+ assert(1234L === getBatchIdFromFileName("1234"))
+ assert(1234L === getBatchIdFromFileName("1234.compact"))
+ intercept[NumberFormatException] {
+ FileStreamSinkLog.getBatchIdFromFileName("1234a")
+ }
+ }
+
+ test("isCompactionBatch") {
+ assert(false === isCompactionBatch(0, compactInterval = 3))
+ assert(false === isCompactionBatch(1, compactInterval = 3))
+ assert(true === isCompactionBatch(2, compactInterval = 3))
+ assert(false === isCompactionBatch(3, compactInterval = 3))
+ assert(false === isCompactionBatch(4, compactInterval = 3))
+ assert(true === isCompactionBatch(5, compactInterval = 3))
+ }
+
+ test("nextCompactionBatchId") {
+ assert(2 === nextCompactionBatchId(0, compactInterval = 3))
+ assert(2 === nextCompactionBatchId(1, compactInterval = 3))
+ assert(5 === nextCompactionBatchId(2, compactInterval = 3))
+ assert(5 === nextCompactionBatchId(3, compactInterval = 3))
+ assert(5 === nextCompactionBatchId(4, compactInterval = 3))
+ assert(8 === nextCompactionBatchId(5, compactInterval = 3))
+ }
+
+ test("getValidBatchesBeforeCompactionBatch") {
+ intercept[AssertionError] {
+ getValidBatchesBeforeCompactionBatch(0, compactInterval = 3)
+ }
+ intercept[AssertionError] {
+ getValidBatchesBeforeCompactionBatch(1, compactInterval = 3)
+ }
+ assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3))
+ intercept[AssertionError] {
+ getValidBatchesBeforeCompactionBatch(3, compactInterval = 3)
+ }
+ intercept[AssertionError] {
+ getValidBatchesBeforeCompactionBatch(4, compactInterval = 3)
+ }
+ assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3))
+ }
+
+ test("getAllValidBatches") {
+ assert(Seq(0) === getAllValidBatches(0, compactInterval = 3))
+ assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3))
+ assert(Seq(2) === getAllValidBatches(2, compactInterval = 3))
+ assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3))
+ assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3))
+ assert(Seq(5) === getAllValidBatches(5, compactInterval = 3))
+ assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3))
+ assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3))
+ assert(Seq(8) === getAllValidBatches(8, compactInterval = 3))
+ }
+
+ test("compactLogs") {
+ val logs = Seq(
+ newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION))
+ assert(logs === compactLogs(logs))
+
+ val logs2 = Seq(
+ newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION),
+ newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.DELETE_ACTION))
+ assert(logs.dropRight(1) ++ logs2.dropRight(1) === compactLogs(logs ++ logs2))
+ }
+
+ test("serialize") {
+ withFileStreamSinkLog { sinkLog =>
+ val logs = Seq(
+ SinkFileStatus(
+ path = "/a/b/x",
+ size = 100L,
+ isDir = false,
+ modificationTime = 1000L,
+ blockReplication = 1,
+ blockSize = 10000L,
+ action = FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus(
+ path = "/a/b/y",
+ size = 200L,
+ isDir = false,
+ modificationTime = 2000L,
+ blockReplication = 2,
+ blockSize = 20000L,
+ action = FileStreamSinkLog.DELETE_ACTION),
+ SinkFileStatus(
+ path = "/a/b/z",
+ size = 300L,
+ isDir = false,
+ modificationTime = 3000L,
+ blockReplication = 3,
+ blockSize = 30000L,
+ action = FileStreamSinkLog.ADD_ACTION))
+
+ // scalastyle:off
+ val expected = s"""${FileStreamSinkLog.VERSION}
+ |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
+ |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
+ |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
+ // scalastyle:on
+ assert(expected === new String(sinkLog.serialize(logs), UTF_8))
+
+ assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Nil), UTF_8))
+ }
+ }
+
+ test("deserialize") {
+ withFileStreamSinkLog { sinkLog =>
+ // scalastyle:off
+ val logs = s"""${FileStreamSinkLog.VERSION}
+ |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
+ |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
+ |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
+ // scalastyle:on
+
+ val expected = Seq(
+ SinkFileStatus(
+ path = "/a/b/x",
+ size = 100L,
+ isDir = false,
+ modificationTime = 1000L,
+ blockReplication = 1,
+ blockSize = 10000L,
+ action = FileStreamSinkLog.ADD_ACTION),
+ SinkFileStatus(
+ path = "/a/b/y",
+ size = 200L,
+ isDir = false,
+ modificationTime = 2000L,
+ blockReplication = 2,
+ blockSize = 20000L,
+ action = FileStreamSinkLog.DELETE_ACTION),
+ SinkFileStatus(
+ path = "/a/b/z",
+ size = 300L,
+ isDir = false,
+ modificationTime = 3000L,
+ blockReplication = 3,
+ blockSize = 30000L,
+ action = FileStreamSinkLog.ADD_ACTION))
+
+ assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8)))
+
+ assert(Nil === sinkLog.deserialize(FileStreamSinkLog.VERSION.getBytes(UTF_8)))
+ }
+ }
+
+ test("batchIdToPath") {
+ withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+ withFileStreamSinkLog { sinkLog =>
+ assert("0" === sinkLog.batchIdToPath(0).getName)
+ assert("1" === sinkLog.batchIdToPath(1).getName)
+ assert("2.compact" === sinkLog.batchIdToPath(2).getName)
+ assert("3" === sinkLog.batchIdToPath(3).getName)
+ assert("4" === sinkLog.batchIdToPath(4).getName)
+ assert("5.compact" === sinkLog.batchIdToPath(5).getName)
+ }
+ }
+ }
+
+ test("compact") {
+ withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+ withFileStreamSinkLog { sinkLog =>
+ for (batchId <- 0 to 10) {
+ sinkLog.add(
+ batchId,
+ Seq(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
+ val expectedFiles = (0 to batchId).map {
+ id => newFakeSinkFileStatus("/a/b/" + id, FileStreamSinkLog.ADD_ACTION)
+ }
+ assert(sinkLog.allFiles() === expectedFiles)
+ if (isCompactionBatch(batchId, 3)) {
+ // Since batchId is a compaction batch, the batch log file should contain all logs
+ assert(sinkLog.get(batchId).getOrElse(Nil) === expectedFiles)
+ }
+ }
+ }
+ }
+ }
+
+ test("delete expired file") {
+ // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
+ // deterministically
+ withSQLConf(
+ SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
+ SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
+ withFileStreamSinkLog { sinkLog =>
+ val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+ def listBatchFiles(): Set[String] = {
+ fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
+ try {
+ getBatchIdFromFileName(fileName)
+ true
+ } catch {
+ case _: NumberFormatException => false
+ }
+ }.toSet
+ }
+
+ sinkLog.add(0, Seq(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("0") === listBatchFiles())
+ sinkLog.add(1, Seq(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("0", "1") === listBatchFiles())
+ sinkLog.add(2, Seq(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("2.compact") === listBatchFiles())
+ sinkLog.add(3, Seq(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("2.compact", "3") === listBatchFiles())
+ sinkLog.add(4, Seq(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("2.compact", "3", "4") === listBatchFiles())
+ sinkLog.add(5, Seq(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("5.compact") === listBatchFiles())
+ }
+ }
+ }
+
+ /**
+ * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
+ * in SinkFileStatus.
+ */
+ private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = {
+ SinkFileStatus(
+ path = path,
+ size = 100L,
+ isDir = false,
+ modificationTime = 100L,
+ blockReplication = 1,
+ blockSize = 100L,
+ action = action)
+ }
+
+ private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = {
+ withTempDir { file =>
+ val sinkLog = new FileStreamSinkLog(sqlContext, file.getCanonicalPath)
+ f(sinkLog)
+ }
+ }
+}