diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala')
-rw-r--r-- | external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 92b5d91ba4..1fb0a33829 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -100,7 +100,7 @@ private[kafka010] class KafkaSource( override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) - writer.write(VERSION) + writer.write("v" + VERSION + "\n") writer.write(metadata.json) writer.flush } @@ -111,13 +111,13 @@ private[kafka010] class KafkaSource( // HDFSMetadataLog guarantees that it never creates a partial file. assert(content.length != 0) if (content(0) == 'v') { - if (content.startsWith(VERSION)) { - KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length))) + val indexOfNewLine = content.indexOf("\n") + if (indexOfNewLine > 0) { + val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) + KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) } else { - val versionInFile = content.substring(0, content.indexOf("\n")) throw new IllegalStateException( - s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " + - s"but was $versionInFile. Please upgrade your Spark.") + s"Log file was malformed: failed to detect the log file version line.") } } else { // The log was generated by Spark 2.1.0 @@ -351,7 +351,7 @@ private[kafka010] object KafkaSource { | source option "failOnDataLoss" to "false". """.stripMargin - private val VERSION = "v1\n" + private[kafka010] val VERSION = 1 def getSortedExecutorList(sc: SparkContext): Array[String] = { val bm = sc.env.blockManager |