aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala14
1 files changed, 7 insertions, 7 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 92b5d91ba4..1fb0a33829 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -100,7 +100,7 @@ private[kafka010] class KafkaSource(
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
- writer.write(VERSION)
+ writer.write("v" + VERSION + "\n")
writer.write(metadata.json)
writer.flush
}
@@ -111,13 +111,13 @@ private[kafka010] class KafkaSource(
// HDFSMetadataLog guarantees that it never creates a partial file.
assert(content.length != 0)
if (content(0) == 'v') {
- if (content.startsWith(VERSION)) {
- KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length)))
+ val indexOfNewLine = content.indexOf("\n")
+ if (indexOfNewLine > 0) {
+ val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+ KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
} else {
- val versionInFile = content.substring(0, content.indexOf("\n"))
throw new IllegalStateException(
- s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " +
- s"but was $versionInFile. Please upgrade your Spark.")
+ s"Log file was malformed: failed to detect the log file version line.")
}
} else {
// The log was generated by Spark 2.1.0
@@ -351,7 +351,7 @@ private[kafka010] object KafkaSource {
| source option "failOnDataLoss" to "false".
""".stripMargin
- private val VERSION = "v1\n"
+ private[kafka010] val VERSION = 1
def getSortedExecutorList(sc: SparkContext): Array[String] = {
val bm = sc.env.blockManager