From 2ea214dd05da929840c15891e908384cfa695ca8 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Thu, 16 Mar 2017 13:05:36 -0700 Subject: [SPARK-19721][SS] Good error message for version mismatch in log files ## Problem There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message. ## What changes were proposed in this pull request? This patch made two major changes: 1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking): ``` HDFSMetadataLog - CompactibleFileStreamLog ------------> fixed with this patch - FileStreamSourceLog ---------------> inherited the fix of `CompactibleFileStreamLog` - FileStreamSinkLog -----------------> inherited the fix of `CompactibleFileStreamLog` - OffsetSeqLog ------------------------> fixed with this patch - anonymous subclass in KafkaSource ---> fixed with this patch ``` 2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"` - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"` ## Exception message with this patch ``` java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade. at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) ``` ## How was this patch tested? unit tests Author: Liwei Lin Closes #17070 from lw-lin/better-msg. --- .../streaming/CompactibleFileStreamLog.scala | 9 ++--- .../execution/streaming/FileStreamSinkLog.scala | 4 +-- .../execution/streaming/FileStreamSourceLog.scala | 4 +-- .../sql/execution/streaming/HDFSMetadataLog.scala | 36 +++++++++++++++++++ .../sql/execution/streaming/OffsetSeqLog.scala | 10 +++--- .../streaming/CompactibleFileStreamLogSuite.scala | 40 ++++++++++++++++++---- .../streaming/FileStreamSinkLogSuite.scala | 8 ++--- .../execution/streaming/HDFSMetadataLogSuite.scala | 27 +++++++++++++++ .../execution/streaming/OffsetSeqLogSuite.scala | 17 +++++++++ 9 files changed, 129 insertions(+), 26 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 5a6f9e87f6..408c8f81f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession * doing a compaction, it will read all old log files and merge them with the new batch. */ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( - metadataLogVersion: String, + metadataLogVersion: Int, sparkSession: SparkSession, path: String) extends HDFSMetadataLog[Array[T]](sparkSession, path) { @@ -134,7 +134,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( override def serialize(logData: Array[T], out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write(metadataLogVersion.getBytes(UTF_8)) + out.write(("v" + metadataLogVersion).getBytes(UTF_8)) logData.foreach { data => out.write('\n') out.write(Serialization.write(data).getBytes(UTF_8)) @@ -146,10 +146,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") } - val version = lines.next() - if (version != metadataLogVersion) { - throw new IllegalStateException(s"Unknown log version: ${version}") - } + val version = parseVersion(lines.next(), metadataLogVersion) lines.map(Serialization.read[T]).toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index eb6eed87ec..8d718b2164 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -77,7 +77,7 @@ object SinkFileStatus { * (drops the deleted files). */ class FileStreamSinkLog( - metadataLogVersion: String, + metadataLogVersion: Int, sparkSession: SparkSession, path: String) extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) { @@ -106,7 +106,7 @@ class FileStreamSinkLog( } object FileStreamSinkLog { - val VERSION = "v1" + val VERSION = 1 val DELETE_ACTION = "delete" val ADD_ACTION = "add" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 81908c0cef..33e6a1d5d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry import org.apache.spark.sql.internal.SQLConf class FileStreamSourceLog( - metadataLogVersion: String, + metadataLogVersion: Int, sparkSession: SparkSession, path: String) extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { @@ -120,5 +120,5 @@ class FileStreamSourceLog( } object FileStreamSourceLog { - val VERSION = "v1" + val VERSION = 1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index f9e1f7de9e..60ce64261c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -195,6 +195,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: val input = fileManager.open(batchMetadataFile) try { Some(deserialize(input)) + } catch { + case ise: IllegalStateException => + // re-throw the exception with the log file path added + throw new IllegalStateException( + s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise) } finally { IOUtils.closeQuietly(input) } @@ -268,6 +273,37 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: new FileSystemManager(metadataPath, hadoopConf) } } + + /** + * Parse the log version from the given `text` -- will throw exception when the parsed version + * exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1", + * "v123xyz" etc.) + */ + private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = { + if (text.length > 0 && text(0) == 'v') { + val version = + try { + text.substring(1, text.length).toInt + } catch { + case _: NumberFormatException => + throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + + s"version from $text.") + } + if (version > 0) { + if (version > maxSupportedVersion) { + throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + + s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + + s"by a newer version of Spark and cannot be read by this version. Please upgrade.") + } else { + return version + } + } + } + + // reaching here means we failed to read the correct log version + throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + + s"version from $text.") + } } object HDFSMetadataLog { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index 3210d8ad64..4f8cd116f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -55,10 +55,8 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") } - val version = lines.next() - if (version != OffsetSeqLog.VERSION) { - throw new IllegalStateException(s"Unknown log version: ${version}") - } + + val version = parseVersion(lines.next(), OffsetSeqLog.VERSION) // read metadata val metadata = lines.next().trim match { @@ -70,7 +68,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write(OffsetSeqLog.VERSION.getBytes(UTF_8)) + out.write(("v" + OffsetSeqLog.VERSION).getBytes(UTF_8)) // write metadata out.write('\n') @@ -88,6 +86,6 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) } object OffsetSeqLog { - private val VERSION = "v1" + private[streaming] val VERSION = 1 private val SERIALIZED_VOID_OFFSET = "-" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index 24d92a9623..20ac06f048 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -122,7 +122,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext defaultMinBatchesToRetain = 1, compactibleLog => { val logs = Array("entry_1", "entry_2", "entry_3") - val expected = s"""${FakeCompactibleFileStreamLog.VERSION} + val expected = s"""v${FakeCompactibleFileStreamLog.VERSION} |"entry_1" |"entry_2" |"entry_3"""".stripMargin @@ -132,7 +132,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext baos.reset() compactibleLog.serialize(Array(), baos) - assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name())) + assert(s"v${FakeCompactibleFileStreamLog.VERSION}" === baos.toString(UTF_8.name())) }) } @@ -142,7 +142,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext defaultCompactInterval = 3, defaultMinBatchesToRetain = 1, compactibleLog => { - val logs = s"""${FakeCompactibleFileStreamLog.VERSION} + val logs = s"""v${FakeCompactibleFileStreamLog.VERSION} |"entry_1" |"entry_2" |"entry_3"""".stripMargin @@ -152,10 +152,36 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext assert(Nil === compactibleLog.deserialize( - new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8)))) + new ByteArrayInputStream(s"v${FakeCompactibleFileStreamLog.VERSION}".getBytes(UTF_8)))) }) } + test("deserialization log written by future version") { + withTempDir { dir => + def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog = + new FakeCompactibleFileStreamLog( + version, + _fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case + _defaultCompactInterval = 3, // this param does not matter here in this test case + _defaultMinBatchesToRetain = 1, // this param does not matter here in this test case + spark, + dir.getCanonicalPath) + + val writer = newFakeCompactibleFileStreamLog(version = 2) + val reader = newFakeCompactibleFileStreamLog(version = 1) + writer.add(0, Array("entry")) + val e = intercept[IllegalStateException] { + reader.get(0) + } + Seq( + "maximum supported log version is v1, but encountered v2", + "produced by a newer version of Spark and cannot be read by this version" + ).foreach { message => + assert(e.getMessage.contains(message)) + } + } + } + test("compact") { withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, @@ -219,6 +245,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext ): Unit = { withTempDir { file => val compactibleLog = new FakeCompactibleFileStreamLog( + FakeCompactibleFileStreamLog.VERSION, fileCleanupDelayMs, defaultCompactInterval, defaultMinBatchesToRetain, @@ -230,17 +257,18 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext } object FakeCompactibleFileStreamLog { - val VERSION = "test_version" + val VERSION = 1 } class FakeCompactibleFileStreamLog( + metadataLogVersion: Int, _fileCleanupDelayMs: Long, _defaultCompactInterval: Int, _defaultMinBatchesToRetain: Int, sparkSession: SparkSession, path: String) extends CompactibleFileStreamLog[String]( - FakeCompactibleFileStreamLog.VERSION, + metadataLogVersion, sparkSession, path ) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 340d2945ac..dd3a414659 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -74,7 +74,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { action = FileStreamSinkLog.ADD_ACTION)) // scalastyle:off - val expected = s"""$VERSION + val expected = s"""v$VERSION |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin @@ -84,14 +84,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { assert(expected === baos.toString(UTF_8.name())) baos.reset() sinkLog.serialize(Array(), baos) - assert(VERSION === baos.toString(UTF_8.name())) + assert(s"v$VERSION" === baos.toString(UTF_8.name())) } } test("deserialize") { withFileStreamSinkLog { sinkLog => // scalastyle:off - val logs = s"""$VERSION + val logs = s"""v$VERSION |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin @@ -125,7 +125,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8)))) - assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8)))) + assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(s"v$VERSION".getBytes(UTF_8)))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 55750b9202..662c4466b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -127,6 +127,33 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } + test("HDFSMetadataLog: parseVersion") { + withTempDir { dir => + val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath) + def assertLogFileMalformed(func: => Int): Unit = { + val e = intercept[IllegalStateException] { func } + assert(e.getMessage.contains(s"Log file was malformed: failed to read correct log version")) + } + assertLogFileMalformed { metadataLog.parseVersion("", 100) } + assertLogFileMalformed { metadataLog.parseVersion("xyz", 100) } + assertLogFileMalformed { metadataLog.parseVersion("v10.x", 100) } + assertLogFileMalformed { metadataLog.parseVersion("10", 100) } + assertLogFileMalformed { metadataLog.parseVersion("v0", 100) } + assertLogFileMalformed { metadataLog.parseVersion("v-10", 100) } + + assert(metadataLog.parseVersion("v10", 10) === 10) + assert(metadataLog.parseVersion("v10", 100) === 10) + + val e = intercept[IllegalStateException] { metadataLog.parseVersion("v200", 100) } + Seq( + "maximum supported log version is v100, but encountered v200", + "produced by a newer version of Spark and cannot be read by this version" + ).foreach { message => + assert(e.getMessage.contains(message)) + } + } + } + test("HDFSMetadataLog: restart") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 5ae8b2484d..f7f0dade87 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.io.File import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.test.SharedSQLContext class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { @@ -70,6 +71,22 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { } } + test("deserialization log written by future version") { + withTempDir { dir => + stringToFile(new File(dir, "0"), "v99999") + val log = new OffsetSeqLog(spark, dir.getCanonicalPath) + val e = intercept[IllegalStateException] { + log.get(0) + } + Seq( + s"maximum supported log version is v${OffsetSeqLog.VERSION}, but encountered v99999", + "produced by a newer version of Spark and cannot be read by this version" + ).foreach { message => + assert(e.getMessage.contains(message)) + } + } + } + test("read Spark 2.1.0 log format") { val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0") assert(batchId === 0) -- cgit v1.2.3