diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala')
-rw-r--r-- | external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index b5ade98251..b5da415b30 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.sql.execution.streaming.Offset +import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} /** * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and @@ -27,9 +27,8 @@ import org.apache.spark.sql.execution.streaming.Offset */ private[kafka010] case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset { - override def toString(): String = { - partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]") - } + + override val json = JsonUtils.partitionOffsets(partitionToOffsets) } /** Companion object of the [[KafkaSourceOffset]] */ @@ -38,6 +37,7 @@ private[kafka010] object KafkaSourceOffset { def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = { offset match { case o: KafkaSourceOffset => o.partitionToOffsets + case so: SerializedOffset => KafkaSourceOffset(so).partitionToOffsets case _ => throw new IllegalArgumentException( s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset") @@ -51,4 +51,10 @@ private[kafka010] object KafkaSourceOffset { def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = { KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap) } + + /** + * Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]] + */ + def apply(offset: SerializedOffset): KafkaSourceOffset = + KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json)) } |