From 8a6f33f0483dcee81467e6374a796b5dbd53ea30 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 27 Mar 2017 19:04:16 -0700 Subject: [SPARK-19876][SS] Follow up: Refactored BatchCommitLog to simplify logic ## What changes were proposed in this pull request? Existing logic seemingly writes null to the BatchCommitLog, even though it does additional checks to write '{}' (valid json) to the log. This PR simplifies the logic by disallowing use of `log.add(batchId, metadata)` and instead using `log.add(batchId)`. No question of specifying metadata, so no confusion related to null. ## How was this patch tested? Existing tests pass. Author: Tathagata Das Closes #17444 from tdas/SPARK-19876-1. --- .../sql/execution/streaming/BatchCommitLog.scala | 28 +++++++++++++--------- .../sql/execution/streaming/HDFSMetadataLog.scala | 1 + .../sql/execution/streaming/StreamExecution.scala | 2 +- 3 files changed, 19 insertions(+), 12 deletions(-) (limited to 'sql/core/src/main') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala index fb1a4fb9b1..a34938f911 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala @@ -45,33 +45,39 @@ import org.apache.spark.sql.SparkSession class BatchCommitLog(sparkSession: SparkSession, path: String) extends HDFSMetadataLog[String](sparkSession, path) { + import BatchCommitLog._ + + def add(batchId: Long): Unit = { + super.add(batchId, EMPTY_JSON) + } + + override def add(batchId: Long, metadata: String): Boolean = { + throw new UnsupportedOperationException( + "BatchCommitLog does not take any metadata, use 'add(batchId)' instead") + } + override protected def deserialize(in: InputStream): String = { // called inside a try-finally where the underlying stream is closed in the caller val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file in the offset commit log") } - parseVersion(lines.next().trim, BatchCommitLog.VERSION) - // read metadata - lines.next().trim match { - case BatchCommitLog.SERIALIZED_VOID => null - case metadata => metadata - } + parseVersion(lines.next.trim, VERSION) + EMPTY_JSON } override protected def serialize(metadata: String, out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8)) + out.write(s"v${VERSION}".getBytes(UTF_8)) out.write('\n') - // write metadata or void - out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata) - .getBytes(UTF_8)) + // write metadata + out.write(EMPTY_JSON.getBytes(UTF_8)) } } object BatchCommitLog { private val VERSION = 1 - private val SERIALIZED_VOID = "{}" + private val EMPTY_JSON = "{}" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 60ce64261c..46bfc29793 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -106,6 +106,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * metadata has already been stored, this method will return `false`. */ override def add(batchId: Long, metadata: T): Boolean = { + require(metadata != null, "'null' metadata cannot written to a metadata log") get(batchId).map(_ => false).getOrElse { // Only write metadata when the batch has not yet been written writeBatch(batchId, metadata) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 34e9262af7..5f548172f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -305,7 +305,7 @@ class StreamExecution( if (dataAvailable) { // Update committed offsets. committedOffsets ++= availableOffsets - batchCommitLog.add(currentBatchId, null) + batchCommitLog.add(currentBatchId) logDebug(s"batch ${currentBatchId} committed") // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 -- cgit v1.2.3