aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2017-03-16 13:05:36 -0700
committerShixiong Zhu <shixiong@databricks.com>2017-03-16 13:05:36 -0700
commit2ea214dd05da929840c15891e908384cfa695ca8 (patch)
tree2c86aead72b8b8b254639be195cdc88ed316f6af /sql
parent8e8f898335f5019c0d4f3944c4aefa12a185db70 (diff)
downloadspark-2ea214dd05da929840c15891e908384cfa695ca8.tar.gz
spark-2ea214dd05da929840c15891e908384cfa695ca8.tar.bz2
spark-2ea214dd05da929840c15891e908384cfa695ca8.zip
[SPARK-19721][SS] Good error message for version mismatch in log files
## Problem There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message. ## What changes were proposed in this pull request? This patch made two major changes: 1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking): ``` HDFSMetadataLog - CompactibleFileStreamLog ------------> fixed with this patch - FileStreamSourceLog ---------------> inherited the fix of `CompactibleFileStreamLog` - FileStreamSinkLog -----------------> inherited the fix of `CompactibleFileStreamLog` - OffsetSeqLog ------------------------> fixed with this patch - anonymous subclass in KafkaSource ---> fixed with this patch ``` 2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"` - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"` ## Exception message with this patch ``` java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade. at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) ``` ## How was this patch tested? unit tests Author: Liwei Lin <lwlin7@gmail.com> Closes #17070 from lw-lin/better-msg.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala40
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala27
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala17
9 files changed, 129 insertions, 26 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 5a6f9e87f6..408c8f81f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession
* doing a compaction, it will read all old log files and merge them with the new batch.
*/
abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
- metadataLogVersion: String,
+ metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
extends HDFSMetadataLog[Array[T]](sparkSession, path) {
@@ -134,7 +134,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
override def serialize(logData: Array[T], out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
- out.write(metadataLogVersion.getBytes(UTF_8))
+ out.write(("v" + metadataLogVersion).getBytes(UTF_8))
logData.foreach { data =>
out.write('\n')
out.write(Serialization.write(data).getBytes(UTF_8))
@@ -146,10 +146,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
}
- val version = lines.next()
- if (version != metadataLogVersion) {
- throw new IllegalStateException(s"Unknown log version: ${version}")
- }
+ val version = parseVersion(lines.next(), metadataLogVersion)
lines.map(Serialization.read[T]).toArray
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index eb6eed87ec..8d718b2164 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -77,7 +77,7 @@ object SinkFileStatus {
* (drops the deleted files).
*/
class FileStreamSinkLog(
- metadataLogVersion: String,
+ metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
@@ -106,7 +106,7 @@ class FileStreamSinkLog(
}
object FileStreamSinkLog {
- val VERSION = "v1"
+ val VERSION = 1
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 81908c0cef..33e6a1d5d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
import org.apache.spark.sql.internal.SQLConf
class FileStreamSourceLog(
- metadataLogVersion: String,
+ metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {
@@ -120,5 +120,5 @@ class FileStreamSourceLog(
}
object FileStreamSourceLog {
- val VERSION = "v1"
+ val VERSION = 1
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index f9e1f7de9e..60ce64261c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -195,6 +195,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
val input = fileManager.open(batchMetadataFile)
try {
Some(deserialize(input))
+ } catch {
+ case ise: IllegalStateException =>
+ // re-throw the exception with the log file path added
+ throw new IllegalStateException(
+ s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
} finally {
IOUtils.closeQuietly(input)
}
@@ -268,6 +273,37 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
new FileSystemManager(metadataPath, hadoopConf)
}
}
+
+ /**
+ * Parse the log version from the given `text` -- will throw exception when the parsed version
+ * exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1",
+ * "v123xyz" etc.)
+ */
+ private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = {
+ if (text.length > 0 && text(0) == 'v') {
+ val version =
+ try {
+ text.substring(1, text.length).toInt
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
+ s"version from $text.")
+ }
+ if (version > 0) {
+ if (version > maxSupportedVersion) {
+ throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
+ s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
+ s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
+ } else {
+ return version
+ }
+ }
+ }
+
+ // reaching here means we failed to read the correct log version
+ throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
+ s"version from $text.")
+ }
}
object HDFSMetadataLog {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index 3210d8ad64..4f8cd116f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -55,10 +55,8 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
}
- val version = lines.next()
- if (version != OffsetSeqLog.VERSION) {
- throw new IllegalStateException(s"Unknown log version: ${version}")
- }
+
+ val version = parseVersion(lines.next(), OffsetSeqLog.VERSION)
// read metadata
val metadata = lines.next().trim match {
@@ -70,7 +68,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
- out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
+ out.write(("v" + OffsetSeqLog.VERSION).getBytes(UTF_8))
// write metadata
out.write('\n')
@@ -88,6 +86,6 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
}
object OffsetSeqLog {
- private val VERSION = "v1"
+ private[streaming] val VERSION = 1
private val SERIALIZED_VOID_OFFSET = "-"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index 24d92a9623..20ac06f048 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -122,7 +122,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
defaultMinBatchesToRetain = 1,
compactibleLog => {
val logs = Array("entry_1", "entry_2", "entry_3")
- val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
+ val expected = s"""v${FakeCompactibleFileStreamLog.VERSION}
|"entry_1"
|"entry_2"
|"entry_3"""".stripMargin
@@ -132,7 +132,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
baos.reset()
compactibleLog.serialize(Array(), baos)
- assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name()))
+ assert(s"v${FakeCompactibleFileStreamLog.VERSION}" === baos.toString(UTF_8.name()))
})
}
@@ -142,7 +142,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
defaultCompactInterval = 3,
defaultMinBatchesToRetain = 1,
compactibleLog => {
- val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
+ val logs = s"""v${FakeCompactibleFileStreamLog.VERSION}
|"entry_1"
|"entry_2"
|"entry_3"""".stripMargin
@@ -152,10 +152,36 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
assert(Nil ===
compactibleLog.deserialize(
- new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8))))
+ new ByteArrayInputStream(s"v${FakeCompactibleFileStreamLog.VERSION}".getBytes(UTF_8))))
})
}
+ test("deserialization log written by future version") {
+ withTempDir { dir =>
+ def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog =
+ new FakeCompactibleFileStreamLog(
+ version,
+ _fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case
+ _defaultCompactInterval = 3, // this param does not matter here in this test case
+ _defaultMinBatchesToRetain = 1, // this param does not matter here in this test case
+ spark,
+ dir.getCanonicalPath)
+
+ val writer = newFakeCompactibleFileStreamLog(version = 2)
+ val reader = newFakeCompactibleFileStreamLog(version = 1)
+ writer.add(0, Array("entry"))
+ val e = intercept[IllegalStateException] {
+ reader.get(0)
+ }
+ Seq(
+ "maximum supported log version is v1, but encountered v2",
+ "produced by a newer version of Spark and cannot be read by this version"
+ ).foreach { message =>
+ assert(e.getMessage.contains(message))
+ }
+ }
+ }
+
test("compact") {
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = Long.MaxValue,
@@ -219,6 +245,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
): Unit = {
withTempDir { file =>
val compactibleLog = new FakeCompactibleFileStreamLog(
+ FakeCompactibleFileStreamLog.VERSION,
fileCleanupDelayMs,
defaultCompactInterval,
defaultMinBatchesToRetain,
@@ -230,17 +257,18 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
}
object FakeCompactibleFileStreamLog {
- val VERSION = "test_version"
+ val VERSION = 1
}
class FakeCompactibleFileStreamLog(
+ metadataLogVersion: Int,
_fileCleanupDelayMs: Long,
_defaultCompactInterval: Int,
_defaultMinBatchesToRetain: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[String](
- FakeCompactibleFileStreamLog.VERSION,
+ metadataLogVersion,
sparkSession,
path
) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 340d2945ac..dd3a414659 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -74,7 +74,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
action = FileStreamSinkLog.ADD_ACTION))
// scalastyle:off
- val expected = s"""$VERSION
+ val expected = s"""v$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
@@ -84,14 +84,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
assert(expected === baos.toString(UTF_8.name()))
baos.reset()
sinkLog.serialize(Array(), baos)
- assert(VERSION === baos.toString(UTF_8.name()))
+ assert(s"v$VERSION" === baos.toString(UTF_8.name()))
}
}
test("deserialize") {
withFileStreamSinkLog { sinkLog =>
// scalastyle:off
- val logs = s"""$VERSION
+ val logs = s"""v$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
@@ -125,7 +125,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))
- assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8))))
+ assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(s"v$VERSION".getBytes(UTF_8))))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 55750b9202..662c4466b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -127,6 +127,33 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ test("HDFSMetadataLog: parseVersion") {
+ withTempDir { dir =>
+ val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
+ def assertLogFileMalformed(func: => Int): Unit = {
+ val e = intercept[IllegalStateException] { func }
+ assert(e.getMessage.contains(s"Log file was malformed: failed to read correct log version"))
+ }
+ assertLogFileMalformed { metadataLog.parseVersion("", 100) }
+ assertLogFileMalformed { metadataLog.parseVersion("xyz", 100) }
+ assertLogFileMalformed { metadataLog.parseVersion("v10.x", 100) }
+ assertLogFileMalformed { metadataLog.parseVersion("10", 100) }
+ assertLogFileMalformed { metadataLog.parseVersion("v0", 100) }
+ assertLogFileMalformed { metadataLog.parseVersion("v-10", 100) }
+
+ assert(metadataLog.parseVersion("v10", 10) === 10)
+ assert(metadataLog.parseVersion("v10", 100) === 10)
+
+ val e = intercept[IllegalStateException] { metadataLog.parseVersion("v200", 100) }
+ Seq(
+ "maximum supported log version is v100, but encountered v200",
+ "produced by a newer version of Spark and cannot be read by this version"
+ ).foreach { message =>
+ assert(e.getMessage.contains(message))
+ }
+ }
+ }
+
test("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 5ae8b2484d..f7f0dade87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.io.File
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.test.SharedSQLContext
class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
@@ -70,6 +71,22 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ test("deserialization log written by future version") {
+ withTempDir { dir =>
+ stringToFile(new File(dir, "0"), "v99999")
+ val log = new OffsetSeqLog(spark, dir.getCanonicalPath)
+ val e = intercept[IllegalStateException] {
+ log.get(0)
+ }
+ Seq(
+ s"maximum supported log version is v${OffsetSeqLog.VERSION}, but encountered v99999",
+ "produced by a newer version of Spark and cannot be read by this version"
+ ).foreach { message =>
+ assert(e.getMessage.contains(message))
+ }
+ }
+ }
+
test("read Spark 2.1.0 log format") {
val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
assert(batchId === 0)