aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala14
1 files changed, 10 insertions, 4 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index b5ade98251..b5da415b30 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition
-import org.apache.spark.sql.execution.streaming.Offset
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
@@ -27,9 +27,8 @@ import org.apache.spark.sql.execution.streaming.Offset
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset {
- override def toString(): String = {
- partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]")
- }
+
+ override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
/** Companion object of the [[KafkaSourceOffset]] */
@@ -38,6 +37,7 @@ private[kafka010] object KafkaSourceOffset {
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = {
offset match {
case o: KafkaSourceOffset => o.partitionToOffsets
+ case so: SerializedOffset => KafkaSourceOffset(so).partitionToOffsets
case _ =>
throw new IllegalArgumentException(
s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset")
@@ -51,4 +51,10 @@ private[kafka010] object KafkaSourceOffset {
def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = {
KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap)
}
+
+ /**
+ * Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]]
+ */
+ def apply(offset: SerializedOffset): KafkaSourceOffset =
+ KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
}