aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala100
1 files changed, 66 insertions, 34 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index d95ec7f67f..1b41352893 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}
- /**
- * Write a batch to a temp file then rename it to the batch file.
- *
- * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
- * valid behavior, we still need to prevent it from destroying the files.
- */
- private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
- // Use nextId to create a temp file
+ def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
var nextId = 0
while (true) {
val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
@@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
val output = fileManager.create(tempPath)
try {
writer(metadata, output)
+ return Some(tempPath)
} finally {
IOUtils.closeQuietly(output)
}
- try {
- // Try to commit the batch
- // It will fail if there is an existing file (someone has committed the batch)
- logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
- fileManager.rename(tempPath, batchIdToPath(batchId))
-
- // SPARK-17475: HDFSMetadataLog should not leak CRC files
- // If the underlying filesystem didn't rename the CRC file, delete it.
- val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
- if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
- return
- } catch {
- case e: IOException if isFileAlreadyExistsException(e) =>
- // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
- // So throw an exception to tell the user this is not a valid behavior.
- throw new ConcurrentModificationException(
- s"Multiple HDFSMetadataLog are using $path", e)
- case e: FileNotFoundException =>
- // Sometimes, "create" will succeed when multiple writers are calling it at the same
- // time. However, only one writer can call "rename" successfully, others will get
- // FileNotFoundException because the first writer has removed it.
- throw new ConcurrentModificationException(
- s"Multiple HDFSMetadataLog are using $path", e)
- }
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// Failed to create "tempPath". There are two cases:
@@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// metadata path. In addition, the old Streaming also have this issue, people can create
// malicious checkpoint files to crash a Streaming application too.
nextId += 1
- } finally {
- fileManager.delete(tempPath)
}
}
+ None
+ }
+
+ /**
+ * Write a batch to a temp file then rename it to the batch file.
+ *
+ * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
+ * valid behavior, we still need to prevent it from destroying the files.
+ */
+ private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
+ val tempPath = writeTempBatch(metadata, writer).getOrElse(
+ throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
+ try {
+ // Try to commit the batch
+ // It will fail if there is an existing file (someone has committed the batch)
+ logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
+ fileManager.rename(tempPath, batchIdToPath(batchId))
+
+ // SPARK-17475: HDFSMetadataLog should not leak CRC files
+ // If the underlying filesystem didn't rename the CRC file, delete it.
+ val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
+ if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
+ } catch {
+ case e: IOException if isFileAlreadyExistsException(e) =>
+ // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
+ // So throw an exception to tell the user this is not a valid behavior.
+ throw new ConcurrentModificationException(
+ s"Multiple HDFSMetadataLog are using $path", e)
+ case e: FileNotFoundException =>
+ // Sometimes, "create" will succeed when multiple writers are calling it at the same
+ // time. However, only one writer can call "rename" successfully, others will get
+ // FileNotFoundException because the first writer has removed it.
+ throw new ConcurrentModificationException(
+ s"Multiple HDFSMetadataLog are using $path", e)
+ } finally {
+ fileManager.delete(tempPath)
+ }
}
private def isFileAlreadyExistsException(e: IOException): Boolean = {
@@ -208,6 +213,22 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
(e.getMessage != null && e.getMessage.startsWith("File already exists: "))
}
+ /**
+ * @return the deserialized metadata in a batch file, or None if file not exist.
+ * @throws IllegalArgumentException when path does not point to a batch file.
+ */
+ def get(batchFile: Path): Option[T] = {
+ if (fileManager.exists(batchFile)) {
+ if (isBatchFile(batchFile)) {
+ get(pathToBatchId(batchFile))
+ } else {
+ throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!")
+ }
+ } else {
+ None
+ }
+ }
+
override def get(batchId: Long): Option[T] = {
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
@@ -251,6 +272,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
/**
+ * Get an array of [FileStatus] referencing batch files.
+ * The array is sorted by most recent batch file first to
+ * oldest batch file.
+ */
+ def getOrderedBatchFiles(): Array[FileStatus] = {
+ fileManager.list(metadataPath, batchFilesFilter)
+ .sortBy(f => pathToBatchId(f.getPath))
+ .reverse
+ }
+
+ /**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
*/
override def purge(thresholdBatchId: Long): Unit = {