diff options
author | Liwei Lin <lwlin7@gmail.com> | 2017-03-16 13:05:36 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-03-16 13:05:36 -0700 |
commit | 2ea214dd05da929840c15891e908384cfa695ca8 (patch) | |
tree | 2c86aead72b8b8b254639be195cdc88ed316f6af /external | |
parent | 8e8f898335f5019c0d4f3944c4aefa12a185db70 (diff) | |
download | spark-2ea214dd05da929840c15891e908384cfa695ca8.tar.gz spark-2ea214dd05da929840c15891e908384cfa695ca8.tar.bz2 spark-2ea214dd05da929840c15891e908384cfa695ca8.zip |
[SPARK-19721][SS] Good error message for version mismatch in log files
## Problem
There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message.
## What changes were proposed in this pull request?
This patch made two major changes:
1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking):
```
HDFSMetadataLog
- CompactibleFileStreamLog ------------> fixed with this patch
- FileStreamSourceLog ---------------> inherited the fix of `CompactibleFileStreamLog`
- FileStreamSinkLog -----------------> inherited the fix of `CompactibleFileStreamLog`
- OffsetSeqLog ------------------------> fixed with this patch
- anonymous subclass in KafkaSource ---> fixed with this patch
```
2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"`
- note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"`
## Exception message with this patch
```
java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade.
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202)
at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78)
at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75)
at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133)
at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26)
at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75)
at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```
## How was this patch tested?
unit tests
Author: Liwei Lin <lwlin7@gmail.com>
Closes #17070 from lw-lin/better-msg.
Diffstat (limited to 'external')
2 files changed, 14 insertions, 9 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 92b5d91ba4..1fb0a33829 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -100,7 +100,7 @@ private[kafka010] class KafkaSource( override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) - writer.write(VERSION) + writer.write("v" + VERSION + "\n") writer.write(metadata.json) writer.flush } @@ -111,13 +111,13 @@ private[kafka010] class KafkaSource( // HDFSMetadataLog guarantees that it never creates a partial file. assert(content.length != 0) if (content(0) == 'v') { - if (content.startsWith(VERSION)) { - KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length))) + val indexOfNewLine = content.indexOf("\n") + if (indexOfNewLine > 0) { + val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) + KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) } else { - val versionInFile = content.substring(0, content.indexOf("\n")) throw new IllegalStateException( - s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " + - s"but was $versionInFile. Please upgrade your Spark.") + s"Log file was malformed: failed to detect the log file version line.") } } else { // The log was generated by Spark 2.1.0 @@ -351,7 +351,7 @@ private[kafka010] object KafkaSource { | source option "failOnDataLoss" to "false". """.stripMargin - private val VERSION = "v1\n" + private[kafka010] val VERSION = 1 def getSortedExecutorList(sc: SparkContext): Array[String] = { val bm = sc.env.blockManager diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index bf6aad671a..7b6396e029 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -205,7 +205,7 @@ class KafkaSourceSuite extends KafkaSourceTest { override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { out.write(0) val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8)) - writer.write(s"v0\n${metadata.json}") + writer.write(s"v99999\n${metadata.json}") writer.flush } } @@ -227,7 +227,12 @@ class KafkaSourceSuite extends KafkaSourceTest { source.getOffset.get // Read initial offset } - assert(e.getMessage.contains("Please upgrade your Spark")) + Seq( + s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999", + "produced by a newer version of Spark and cannot be read by this version" + ).foreach { message => + assert(e.getMessage.contains(message)) + } } } |