aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-03-27 19:04:16 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-27 19:04:16 -0700
commit8a6f33f0483dcee81467e6374a796b5dbd53ea30 (patch)
tree6e20dd10fda762d23f0f2848407c194bafa90684 /sql/core/src/main
parenta250933c625ed720d15a0e479e9c51113605b102 (diff)
downloadspark-8a6f33f0483dcee81467e6374a796b5dbd53ea30.tar.gz
spark-8a6f33f0483dcee81467e6374a796b5dbd53ea30.tar.bz2
spark-8a6f33f0483dcee81467e6374a796b5dbd53ea30.zip
[SPARK-19876][SS] Follow up: Refactored BatchCommitLog to simplify logic
## What changes were proposed in this pull request? Existing logic seemingly writes null to the BatchCommitLog, even though it does additional checks to write '{}' (valid json) to the log. This PR simplifies the logic by disallowing use of `log.add(batchId, metadata)` and instead using `log.add(batchId)`. No question of specifying metadata, so no confusion related to null. ## How was this patch tested? Existing tests pass. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17444 from tdas/SPARK-19876-1.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala2
3 files changed, 19 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
index fb1a4fb9b1..a34938f911 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
@@ -45,33 +45,39 @@ import org.apache.spark.sql.SparkSession
class BatchCommitLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[String](sparkSession, path) {
+ import BatchCommitLog._
+
+ def add(batchId: Long): Unit = {
+ super.add(batchId, EMPTY_JSON)
+ }
+
+ override def add(batchId: Long, metadata: String): Boolean = {
+ throw new UnsupportedOperationException(
+ "BatchCommitLog does not take any metadata, use 'add(batchId)' instead")
+ }
+
override protected def deserialize(in: InputStream): String = {
// called inside a try-finally where the underlying stream is closed in the caller
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file in the offset commit log")
}
- parseVersion(lines.next().trim, BatchCommitLog.VERSION)
- // read metadata
- lines.next().trim match {
- case BatchCommitLog.SERIALIZED_VOID => null
- case metadata => metadata
- }
+ parseVersion(lines.next.trim, VERSION)
+ EMPTY_JSON
}
override protected def serialize(metadata: String, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
- out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8))
+ out.write(s"v${VERSION}".getBytes(UTF_8))
out.write('\n')
- // write metadata or void
- out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata)
- .getBytes(UTF_8))
+ // write metadata
+ out.write(EMPTY_JSON.getBytes(UTF_8))
}
}
object BatchCommitLog {
private val VERSION = 1
- private val SERIALIZED_VOID = "{}"
+ private val EMPTY_JSON = "{}"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 60ce64261c..46bfc29793 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -106,6 +106,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
* metadata has already been stored, this method will return `false`.
*/
override def add(batchId: Long, metadata: T): Boolean = {
+ require(metadata != null, "'null' metadata cannot written to a metadata log")
get(batchId).map(_ => false).getOrElse {
// Only write metadata when the batch has not yet been written
writeBatch(batchId, metadata)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 34e9262af7..5f548172f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -305,7 +305,7 @@ class StreamExecution(
if (dataAvailable) {
// Update committed offsets.
committedOffsets ++= availableOffsets
- batchCommitLog.add(currentBatchId, null)
+ batchCommitLog.add(currentBatchId)
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1