diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-08 17:53:34 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-08 17:53:34 -0800 |
commit | 458fa3325e5f8c21c50e406ac8059d6236f93a9c (patch) | |
tree | 5bd65e219373e65ec9a029d76499e43206b73729 /external | |
parent | 202fcd21ce01393fa6dfaa1c2126e18e9b85ee96 (diff) | |
download | spark-458fa3325e5f8c21c50e406ac8059d6236f93a9c.tar.gz spark-458fa3325e5f8c21c50e406ac8059d6236f93a9c.tar.bz2 spark-458fa3325e5f8c21c50e406ac8059d6236f93a9c.zip |
[SPARK-18776][SS] Make Offset for FileStreamSource corrected formatted in json
## What changes were proposed in this pull request?
- Changed FileStreamSource to use new FileStreamSourceOffset rather than LongOffset. The field is named as `logOffset` to make it more clear that this is a offset in the file stream log.
- Fixed bug in FileStreamSourceLog, the field endId in the FileStreamSourceLog.get(startId, endId) was not being used at all. No test caught it earlier. Only my updated tests caught it.
Other minor changes
- Dont use batchId in the FileStreamSource, as calling it batch id is extremely miss leading. With multiple sources, it may happen that a new batch has no new data from a file source. So offset of FileStreamSource != batchId after that batch.
## How was this patch tested?
Updated unit test.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #16205 from tdas/SPARK-18776.
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 22668fd6fa..10b35c74f4 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -90,7 +90,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { } } - test("read Spark 2.1.0 log format") { + test("read Spark 2.1.0 offset format") { val offset = readFromResource("kafka-source-offset-version-2.1.0.txt") assert(KafkaSourceOffset(offset) === KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L))) |