diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-06 13:05:22 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-12-06 13:05:22 -0800 |
commit | 1ef6b296d7cd2d93cdfd5f54940842d6bb915ce0 (patch) | |
tree | 136f26601c832c9f40ee61e2f42bd20671e04e59 /external | |
parent | cb1f10b468e7771af75cb2288d375a87ab66d316 (diff) | |
download | spark-1ef6b296d7cd2d93cdfd5f54940842d6bb915ce0.tar.gz spark-1ef6b296d7cd2d93cdfd5f54940842d6bb915ce0.tar.bz2 spark-1ef6b296d7cd2d93cdfd5f54940842d6bb915ce0.zip |
[SPARK-18671][SS][TEST] Added tests to ensure stability of that all Structured Streaming log formats
## What changes were proposed in this pull request?
To be able to restart StreamingQueries across Spark version, we have already made the logs (offset log, file source log, file sink log) use json. We should added tests with actual json files in the Spark such that any incompatible changes in reading the logs is immediately caught. This PR add tests for FileStreamSourceLog, FileStreamSinkLog, and OffsetSeqLog.
## How was this patch tested?
new unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #16128 from tdas/SPARK-18671.
Diffstat (limited to 'external')
2 files changed, 20 insertions, 1 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 13d717092a..868edb5dcd 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -81,7 +81,14 @@ private object JsonUtils { */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { val result = new HashMap[String, HashMap[Int, Long]]() - partitionOffsets.foreach { case (tp, off) => + implicit val ordering = new Ordering[TopicPartition] { + override def compare(x: TopicPartition, y: TopicPartition): Int = { + Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) + } + } + val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism + partitions.foreach { tp => + val off = partitionOffsets(tp) val parts = result.getOrElse(tp.topic, new HashMap[Int, Long]) parts += tp.partition -> off result += tp.topic -> parts diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 881018fd95..c8326ffcc7 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { Array(0 -> batch0Serialized, 1 -> batch1Serialized)) } } + + test("read Spark 2.1.0 log format") { + val offset = readFromResource("kafka-source-offset-version-2.1.0.txt") + assert(KafkaSourceOffset(offset) === + KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L))) + } + + private def readFromResource(file: String): SerializedOffset = { + import scala.io.Source + val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString + SerializedOffset(str) + } } |