aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala53
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-json.txt1
-rw-r--r--sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-long.txt (renamed from sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt)0
-rw-r--r--sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/04
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala30
10 files changed, 95 insertions, 33 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 22668fd6fa..10b35c74f4 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -90,7 +90,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
}
}
- test("read Spark 2.1.0 log format") {
+ test("read Spark 2.1.0 offset format") {
val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
assert(KafkaSourceOffset(offset) ===
KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 8494aef004..20e0dcef8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -57,7 +57,7 @@ class FileStreamSource(
private val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
- private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
+ private var metadataLogCurrentOffset = metadataLog.getLatest().map(_._1).getOrElse(-1L)
/** Maximum number of new files to be considered in each batch */
private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
@@ -79,7 +79,7 @@ class FileStreamSource(
* `synchronized` on this method is for solving race conditions in tests. In the normal usage,
* there is no race here, so the cost of `synchronized` should be rare.
*/
- private def fetchMaxOffset(): LongOffset = synchronized {
+ private def fetchMaxOffset(): FileStreamSourceOffset = synchronized {
// All the new files found - ignore aged files and files that we have seen.
val newFiles = fetchAllFiles().filter {
case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
@@ -104,14 +104,14 @@ class FileStreamSource(
""".stripMargin)
if (batchFiles.nonEmpty) {
- maxBatchId += 1
- metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) =>
- FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId)
+ metadataLogCurrentOffset += 1
+ metadataLog.add(metadataLogCurrentOffset, batchFiles.map { case (p, timestamp) =>
+ FileEntry(path = p, timestamp = timestamp, batchId = metadataLogCurrentOffset)
}.toArray)
- logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
+ logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files")
}
- new LongOffset(maxBatchId)
+ FileStreamSourceOffset(metadataLogCurrentOffset)
}
/**
@@ -122,21 +122,19 @@ class FileStreamSource(
func
}
- /** Return the latest offset in the source */
- def currentOffset: LongOffset = synchronized {
- new LongOffset(maxBatchId)
- }
+ /** Return the latest offset in the [[FileStreamSourceLog]] */
+ def currentLogOffset: Long = synchronized { metadataLogCurrentOffset }
/**
* Returns the data that is between the offsets (`start`, `end`].
*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
- val startId = start.flatMap(LongOffset.convert(_)).getOrElse(LongOffset(-1L)).offset
- val endId = LongOffset.convert(end).getOrElse(LongOffset(0)).offset
+ val startOffset = start.map(FileStreamSourceOffset(_).logOffset).getOrElse(-1L)
+ val endOffset = FileStreamSourceOffset(end).logOffset
- assert(startId <= endId)
- val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
- logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
+ assert(startOffset <= endOffset)
+ val files = metadataLog.get(Some(startOffset + 1), Some(endOffset)).flatMap(_._2)
+ logInfo(s"Processing ${files.length} files from ${startOffset + 1}:$endOffset")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
val newDataSource =
DataSource(
@@ -172,7 +170,7 @@ class FileStreamSource(
files
}
- override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
+ override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.logOffset == -1)
override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 327b3ac267..81908c0cef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -78,7 +78,7 @@ class FileStreamSourceLog(
override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = {
val startBatchId = startId.getOrElse(0L)
- val endBatchId = getLatest().map(_._1).getOrElse(0L)
+ val endBatchId = endId.orElse(getLatest().map(_._1)).getOrElse(0L)
val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id =>
if (isCompactionBatch(id, compactInterval) && fileEntryCache.containsKey(id)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
new file mode 100644
index 0000000000..06d0fe6c18
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.Exception._
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Offset for the [[FileStreamSource]].
+ * @param logOffset Position in the [[FileStreamSourceLog]]
+ */
+case class FileStreamSourceOffset(logOffset: Long) extends Offset {
+ override def json: String = {
+ Serialization.write(this)(FileStreamSourceOffset.format)
+ }
+}
+
+object FileStreamSourceOffset {
+ implicit val format = Serialization.formats(NoTypeHints)
+
+ def apply(offset: Offset): FileStreamSourceOffset = {
+ offset match {
+ case f: FileStreamSourceOffset => f
+ case SerializedOffset(str) =>
+ catching(classOf[NumberFormatException]).opt {
+ FileStreamSourceOffset(str.toLong)
+ }.getOrElse {
+ Serialization.read[FileStreamSourceOffset](str)
+ }
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Invalid conversion from offset of ${offset.getClass} to FileStreamSourceOffset")
+ }
+ }
+}
+
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-json.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-json.txt
new file mode 100644
index 0000000000..e266a47368
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-json.txt
@@ -0,0 +1 @@
+{"logOffset":345}
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-long.txt
index 51b4008129..51b4008129 100644
--- a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
+++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0-long.txt
diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
index fe5c1d44a6..988a98a758 100644
--- a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
+++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
@@ -1,4 +1,4 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}
-0
-{"topic-0":{"0":1}} \ No newline at end of file
+{"logOffset":345}
+{"topic-0":{"0":1}}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index 4a47c04d3f..40d0643ba8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -97,7 +97,7 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), Nil,
dir.getAbsolutePath, Map.empty)
// this method should throw an exception if `fs.exists` is called during resolveRelation
- newSource.getBatch(None, LongOffset(1))
+ newSource.getBatch(None, FileStreamSourceOffset(1))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index d139efaaf8..bb4274a162 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -74,7 +74,7 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
assert(batchId === 0)
assert(offsetSeq.offsets === Seq(
- Some(SerializedOffset("0")),
+ Some(SerializedOffset("""{"logOffset":345}""")),
Some(SerializedOffset("""{"topic-0":{"0":1}}"""))
))
assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 267c462484..bcb6852040 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -61,7 +61,7 @@ abstract class FileStreamSourceTest
val source = sources.head
val newOffset = source.withBatchingLocked {
addData(source)
- source.currentOffset + 1
+ new FileStreamSourceOffset(source.currentLogOffset + 1)
}
logInfo(s"Added file to $source at offset $newOffset")
(source, newOffset)
@@ -987,12 +987,17 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val _sources = PrivateMethod[Seq[Source]]('sources)
val fileSource =
(execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
- assert(fileSource.getBatch(None, LongOffset(2)).as[String].collect() ===
- List("keep1", "keep2", "keep3"))
- assert(fileSource.getBatch(Some(LongOffset(0)), LongOffset(2)).as[String].collect() ===
- List("keep2", "keep3"))
- assert(fileSource.getBatch(Some(LongOffset(1)), LongOffset(2)).as[String].collect() ===
- List("keep3"))
+
+ def verify(startId: Option[Int], endId: Int, expected: String*): Unit = {
+ val start = startId.map(new FileStreamSourceOffset(_))
+ val end = FileStreamSourceOffset(endId)
+ assert(fileSource.getBatch(start, end).as[String].collect().toSeq === expected)
+ }
+
+ verify(startId = None, endId = 2, "keep1", "keep2", "keep3")
+ verify(startId = Some(0), endId = 1, "keep2")
+ verify(startId = Some(0), endId = 2, "keep2", "keep3")
+ verify(startId = Some(1), endId = 2, "keep3")
true
}
)
@@ -1023,9 +1028,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
assert(options.maxFilesPerTrigger == Some(1))
}
- test("FileStreamSource offset - read Spark 2.1.0 log format") {
- val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt")
- assert(LongOffset.convert(offset) === Some(LongOffset(345)))
+ test("FileStreamSource offset - read Spark 2.1.0 offset json format") {
+ val offset = readOffsetFromResource("file-source-offset-version-2.1.0-json.txt")
+ assert(FileStreamSourceOffset(offset) === FileStreamSourceOffset(345))
+ }
+
+ test("FileStreamSource offset - read Spark 2.1.0 offset long format") {
+ val offset = readOffsetFromResource("file-source-offset-version-2.1.0-long.txt")
+ assert(FileStreamSourceOffset(offset) === FileStreamSourceOffset(345))
}
test("FileStreamSourceLog - read Spark 2.1.0 log format") {