diff options
author | Tyson Condie <tcondie@gmail.com> | 2016-11-09 15:03:22 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-11-09 15:03:22 -0800 |
commit | 3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3 (patch) | |
tree | 626afd38724496d5630f54d17acef35424c0746a /external | |
parent | 64fbdf1aa90b66269daec29f62dc9431c1173bab (diff) | |
download | spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.gz spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.bz2 spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.zip |
[SPARK-17829][SQL] Stable format for offset log
## What changes were proposed in this pull request?
Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues:
It can break across spark releases (though this is not the only thing preventing us from upgrading a running query)
It is unnecessarily opaque to the user.
I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option.
## How was this patch tested?
Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala)
Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.
zsxwing marmbrus
Author: Tyson Condie <tcondie@gmail.com>
Author: Tyson Condie <tcondie@clash.local>
Closes #15626 from tcondie/spark-8360.
Diffstat (limited to 'external')
4 files changed, 82 insertions, 8 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 40d568a12c..13d717092a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.kafka010 -import java.io.Writer - import scala.collection.mutable.HashMap import scala.util.control.NonFatal diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index b21508cd7e..5bcc5124b0 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -114,7 +116,22 @@ private[kafka010] case class KafkaSource( * `KafkaConsumer.poll` may hang forever (KAFKA-1894). */ private lazy val initialPartitionOffsets = { - val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) + val metadataLog = + new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) { + override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { + val bytes = metadata.json.getBytes(StandardCharsets.UTF_8) + out.write(bytes.length) + out.write(bytes) + } + + override def deserialize(in: InputStream): KafkaSourceOffset = { + val length = in.read() + val bytes = new Array[Byte](length) + in.read(bytes) + KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8))) + } + } + metadataLog.get(0).getOrElse { val offsets = startingOffsets match { case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets()) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index b5ade98251..b5da415b30 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.sql.execution.streaming.Offset +import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} /** * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and @@ -27,9 +27,8 @@ import org.apache.spark.sql.execution.streaming.Offset */ private[kafka010] case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset { - override def toString(): String = { - partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]") - } + + override val json = JsonUtils.partitionOffsets(partitionToOffsets) } /** Companion object of the [[KafkaSourceOffset]] */ @@ -38,6 +37,7 @@ private[kafka010] object KafkaSourceOffset { def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = { offset match { case o: KafkaSourceOffset => o.partitionToOffsets + case so: SerializedOffset => KafkaSourceOffset(so).partitionToOffsets case _ => throw new IllegalArgumentException( s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset") @@ -51,4 +51,10 @@ private[kafka010] object KafkaSourceOffset { def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = { KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap) } + + /** + * Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]] + */ + def apply(offset: SerializedOffset): KafkaSourceOffset = + KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json)) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 7056a41b17..881018fd95 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.kafka010 +import java.io.File + +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.OffsetSuite +import org.apache.spark.sql.test.SharedSQLContext -class KafkaSourceOffsetSuite extends OffsetSuite { +class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { compare( one = KafkaSourceOffset(("t", 0, 1L)), @@ -36,4 +40,53 @@ class KafkaSourceOffsetSuite extends OffsetSuite { compare( one = KafkaSourceOffset(("t", 0, 1L)), two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) + + + val kso1 = KafkaSourceOffset(("t", 0, 1L)) + val kso2 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L)) + val kso3 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L), ("t", 1, 4L)) + + compare(KafkaSourceOffset(SerializedOffset(kso1.json)), + KafkaSourceOffset(SerializedOffset(kso2.json))) + + test("basic serialization - deserialization") { + assert(KafkaSourceOffset.getPartitionOffsets(kso1) == + KafkaSourceOffset.getPartitionOffsets(SerializedOffset(kso1.json))) + } + + + testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") { + withTempDir { temp => + // use non-existent directory to test whether log make the dir + val dir = new File(temp, "dir") + val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath) + val batch0 = OffsetSeq.fill(kso1) + val batch1 = OffsetSeq.fill(kso2, kso3) + + val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o => + SerializedOffset(o.json))): _*) + + val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o => + SerializedOffset(o.json))): _*) + + assert(metadataLog.add(0, batch0)) + assert(metadataLog.getLatest() === Some(0 -> batch0Serialized)) + assert(metadataLog.get(0) === Some(batch0Serialized)) + + assert(metadataLog.add(1, batch1)) + assert(metadataLog.get(0) === Some(batch0Serialized)) + assert(metadataLog.get(1) === Some(batch1Serialized)) + assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) + assert(metadataLog.get(None, Some(1)) === + Array(0 -> batch0Serialized, 1 -> batch1Serialized)) + + // Adding the same batch does nothing + metadataLog.add(1, OffsetSeq.fill(LongOffset(3))) + assert(metadataLog.get(0) === Some(batch0Serialized)) + assert(metadataLog.get(1) === Some(batch1Serialized)) + assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) + assert(metadataLog.get(None, Some(1)) === + Array(0 -> batch0Serialized, 1 -> batch1Serialized)) + } + } } |