aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTyson Condie <tcondie@gmail.com>2016-11-09 15:03:22 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-09 15:03:22 -0800
commit3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3 (patch)
tree626afd38724496d5630f54d17acef35424c0746a /external
parent64fbdf1aa90b66269daec29f62dc9431c1173bab (diff)
downloadspark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.gz
spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.tar.bz2
spark-3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3.zip
[SPARK-17829][SQL] Stable format for offset log
## What changes were proposed in this pull request? Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues: It can break across spark releases (though this is not the only thing preventing us from upgrading a running query) It is unnecessarily opaque to the user. I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option. ## How was this patch tested? Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala) Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. zsxwing marmbrus Author: Tyson Condie <tcondie@gmail.com> Author: Tyson Condie <tcondie@clash.local> Closes #15626 from tcondie/spark-8360.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala2
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala19
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala14
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala55
4 files changed, 82 insertions, 8 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 40d568a12c..13d717092a 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.kafka010
-import java.io.Writer
-
import scala.collection.mutable.HashMap
import scala.util.control.NonFatal
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index b21508cd7e..5bcc5124b0 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -114,7 +116,22 @@ private[kafka010] case class KafkaSource(
* `KafkaConsumer.poll` may hang forever (KAFKA-1894).
*/
private lazy val initialPartitionOffsets = {
- val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
+ val metadataLog =
+ new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
+ out.write(bytes.length)
+ out.write(bytes)
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ val length = in.read()
+ val bytes = new Array[Byte](length)
+ in.read(bytes)
+ KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8)))
+ }
+ }
+
metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets())
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index b5ade98251..b5da415b30 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition
-import org.apache.spark.sql.execution.streaming.Offset
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
@@ -27,9 +27,8 @@ import org.apache.spark.sql.execution.streaming.Offset
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset {
- override def toString(): String = {
- partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]")
- }
+
+ override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
/** Companion object of the [[KafkaSourceOffset]] */
@@ -38,6 +37,7 @@ private[kafka010] object KafkaSourceOffset {
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = {
offset match {
case o: KafkaSourceOffset => o.partitionToOffsets
+ case so: SerializedOffset => KafkaSourceOffset(so).partitionToOffsets
case _ =>
throw new IllegalArgumentException(
s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset")
@@ -51,4 +51,10 @@ private[kafka010] object KafkaSourceOffset {
def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = {
KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap)
}
+
+ /**
+ * Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]]
+ */
+ def apply(offset: SerializedOffset): KafkaSourceOffset =
+ KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 7056a41b17..881018fd95 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -17,9 +17,13 @@
package org.apache.spark.sql.kafka010
+import java.io.File
+
+import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.OffsetSuite
+import org.apache.spark.sql.test.SharedSQLContext
-class KafkaSourceOffsetSuite extends OffsetSuite {
+class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
compare(
one = KafkaSourceOffset(("t", 0, 1L)),
@@ -36,4 +40,53 @@ class KafkaSourceOffsetSuite extends OffsetSuite {
compare(
one = KafkaSourceOffset(("t", 0, 1L)),
two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)))
+
+
+ val kso1 = KafkaSourceOffset(("t", 0, 1L))
+ val kso2 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L))
+ val kso3 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L), ("t", 1, 4L))
+
+ compare(KafkaSourceOffset(SerializedOffset(kso1.json)),
+ KafkaSourceOffset(SerializedOffset(kso2.json)))
+
+ test("basic serialization - deserialization") {
+ assert(KafkaSourceOffset.getPartitionOffsets(kso1) ==
+ KafkaSourceOffset.getPartitionOffsets(SerializedOffset(kso1.json)))
+ }
+
+
+ testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") {
+ withTempDir { temp =>
+ // use non-existent directory to test whether log make the dir
+ val dir = new File(temp, "dir")
+ val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
+ val batch0 = OffsetSeq.fill(kso1)
+ val batch1 = OffsetSeq.fill(kso2, kso3)
+
+ val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o =>
+ SerializedOffset(o.json))): _*)
+
+ val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o =>
+ SerializedOffset(o.json))): _*)
+
+ assert(metadataLog.add(0, batch0))
+ assert(metadataLog.getLatest() === Some(0 -> batch0Serialized))
+ assert(metadataLog.get(0) === Some(batch0Serialized))
+
+ assert(metadataLog.add(1, batch1))
+ assert(metadataLog.get(0) === Some(batch0Serialized))
+ assert(metadataLog.get(1) === Some(batch1Serialized))
+ assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+ assert(metadataLog.get(None, Some(1)) ===
+ Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+
+ // Adding the same batch does nothing
+ metadataLog.add(1, OffsetSeq.fill(LongOffset(3)))
+ assert(metadataLog.get(0) === Some(batch0Serialized))
+ assert(metadataLog.get(1) === Some(batch1Serialized))
+ assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+ assert(metadataLog.get(None, Some(1)) ===
+ Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+ }
+ }
}