aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-12-08 17:53:34 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-08 17:53:34 -0800
commit458fa3325e5f8c21c50e406ac8059d6236f93a9c (patch)
tree5bd65e219373e65ec9a029d76499e43206b73729 /external
parent202fcd21ce01393fa6dfaa1c2126e18e9b85ee96 (diff)
downloadspark-458fa3325e5f8c21c50e406ac8059d6236f93a9c.tar.gz
spark-458fa3325e5f8c21c50e406ac8059d6236f93a9c.tar.bz2
spark-458fa3325e5f8c21c50e406ac8059d6236f93a9c.zip
[SPARK-18776][SS] Make Offset for FileStreamSource corrected formatted in json
## What changes were proposed in this pull request? - Changed FileStreamSource to use new FileStreamSourceOffset rather than LongOffset. The field is named as `logOffset` to make it more clear that this is a offset in the file stream log. - Fixed bug in FileStreamSourceLog, the field endId in the FileStreamSourceLog.get(startId, endId) was not being used at all. No test caught it earlier. Only my updated tests caught it. Other minor changes - Dont use batchId in the FileStreamSource, as calling it batch id is extremely miss leading. With multiple sources, it may happen that a new batch has no new data from a file source. So offset of FileStreamSource != batchId after that batch. ## How was this patch tested? Updated unit test. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16205 from tdas/SPARK-18776.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 22668fd6fa..10b35c74f4 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -90,7 +90,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
}
}
- test("read Spark 2.1.0 log format") {
+ test("read Spark 2.1.0 offset format") {
val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
assert(KafkaSourceOffset(offset) ===
KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))