aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala9
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala12
2 files changed, 20 insertions, 1 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 13d717092a..868edb5dcd 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -81,7 +81,14 @@ private object JsonUtils {
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
- partitionOffsets.foreach { case (tp, off) =>
+ implicit val ordering = new Ordering[TopicPartition] {
+ override def compare(x: TopicPartition, y: TopicPartition): Int = {
+ Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
+ }
+ }
+ val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
+ partitions.foreach { tp =>
+ val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 881018fd95..c8326ffcc7 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}
+
+ test("read Spark 2.1.0 log format") {
+ val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
+ assert(KafkaSourceOffset(offset) ===
+ KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
+ }
+
+ private def readFromResource(file: String): SerializedOffset = {
+ import scala.io.Source
+ val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
+ SerializedOffset(str)
+ }
}