aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorTyson Condie <tcondie@gmail.com>2016-11-29 12:36:41 -0800
committerMichael Armbrust <michael@databricks.com>2016-11-29 12:37:36 -0800
commitf643fe47f4889faf68da3da8d7850ee48df7c22f (patch)
tree4863d1155481894cb3355d4e4b80fbdef36264a9 /sql/core
parent95f79850127204c75d1b356727237ef68d042e69 (diff)
downloadspark-f643fe47f4889faf68da3da8d7850ee48df7c22f.tar.gz
spark-f643fe47f4889faf68da3da8d7850ee48df7c22f.tar.bz2
spark-f643fe47f4889faf68da3da8d7850ee48df7c22f.zip
[SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing
Revise HDFSMetadataLog API such that metadata object serialization and final batch file write are separated. This will allow serialization checks without worrying about batch file name formats. marmbrus zsxwing Existing tests already ensure this API faithfully support core functionality i.e., creation of batch files. Author: Tyson Condie <tcondie@gmail.com> Closes #15924 from tcondie/SPARK-18498. Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala100
1 files changed, 66 insertions, 34 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index d95ec7f67f..1b41352893 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}
- /**
- * Write a batch to a temp file then rename it to the batch file.
- *
- * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
- * valid behavior, we still need to prevent it from destroying the files.
- */
- private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
- // Use nextId to create a temp file
+ def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
var nextId = 0
while (true) {
val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
@@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
val output = fileManager.create(tempPath)
try {
writer(metadata, output)
+ return Some(tempPath)
} finally {
IOUtils.closeQuietly(output)
}
- try {
- // Try to commit the batch
- // It will fail if there is an existing file (someone has committed the batch)
- logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
- fileManager.rename(tempPath, batchIdToPath(batchId))
-
- // SPARK-17475: HDFSMetadataLog should not leak CRC files
- // If the underlying filesystem didn't rename the CRC file, delete it.
- val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
- if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
- return
- } catch {
- case e: IOException if isFileAlreadyExistsException(e) =>
- // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
- // So throw an exception to tell the user this is not a valid behavior.
- throw new ConcurrentModificationException(
- s"Multiple HDFSMetadataLog are using $path", e)
- case e: FileNotFoundException =>
- // Sometimes, "create" will succeed when multiple writers are calling it at the same
- // time. However, only one writer can call "rename" successfully, others will get
- // FileNotFoundException because the first writer has removed it.
- throw new ConcurrentModificationException(
- s"Multiple HDFSMetadataLog are using $path", e)
- }
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
// Failed to create "tempPath". There are two cases:
@@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// metadata path. In addition, the old Streaming also have this issue, people can create
// malicious checkpoint files to crash a Streaming application too.
nextId += 1
- } finally {
- fileManager.delete(tempPath)
}
}
+ None
+ }
+
+ /**
+ * Write a batch to a temp file then rename it to the batch file.
+ *
+ * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
+ * valid behavior, we still need to prevent it from destroying the files.
+ */
+ private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
+ val tempPath = writeTempBatch(metadata, writer).getOrElse(
+ throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
+ try {
+ // Try to commit the batch
+ // It will fail if there is an existing file (someone has committed the batch)
+ logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
+ fileManager.rename(tempPath, batchIdToPath(batchId))
+
+ // SPARK-17475: HDFSMetadataLog should not leak CRC files
+ // If the underlying filesystem didn't rename the CRC file, delete it.
+ val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
+ if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
+ } catch {
+ case e: IOException if isFileAlreadyExistsException(e) =>
+ // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
+ // So throw an exception to tell the user this is not a valid behavior.
+ throw new ConcurrentModificationException(
+ s"Multiple HDFSMetadataLog are using $path", e)
+ case e: FileNotFoundException =>
+ // Sometimes, "create" will succeed when multiple writers are calling it at the same
+ // time. However, only one writer can call "rename" successfully, others will get
+ // FileNotFoundException because the first writer has removed it.
+ throw new ConcurrentModificationException(
+ s"Multiple HDFSMetadataLog are using $path", e)
+ } finally {
+ fileManager.delete(tempPath)
+ }
}
private def isFileAlreadyExistsException(e: IOException): Boolean = {
@@ -208,6 +213,22 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
(e.getMessage != null && e.getMessage.startsWith("File already exists: "))
}
+ /**
+ * @return the deserialized metadata in a batch file, or None if file not exist.
+ * @throws IllegalArgumentException when path does not point to a batch file.
+ */
+ def get(batchFile: Path): Option[T] = {
+ if (fileManager.exists(batchFile)) {
+ if (isBatchFile(batchFile)) {
+ get(pathToBatchId(batchFile))
+ } else {
+ throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!")
+ }
+ } else {
+ None
+ }
+ }
+
override def get(batchId: Long): Option[T] = {
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
@@ -251,6 +272,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
/**
+ * Get an array of [FileStatus] referencing batch files.
+ * The array is sorted by most recent batch file first to
+ * oldest batch file.
+ */
+ def getOrderedBatchFiles(): Array[FileStatus] = {
+ fileManager.list(metadataPath, batchFilesFilter)
+ .sortBy(f => pathToBatchId(f.getPath))
+ .reverse
+ }
+
+ /**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
*/
override def purge(thresholdBatchId: Long): Unit = {