diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 220f77dc24..9825f19b86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -259,7 +259,7 @@ class StreamExecution( case (source, available) => committedOffsets .get(source) - .map(committed => committed < available) + .map(committed => committed != available) .getOrElse(true) } } @@ -318,7 +318,8 @@ class StreamExecution( // Request unprocessed data from all sources. val newData = availableOffsets.flatMap { - case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) => + case (source, available) + if committedOffsets.get(source).map(_ != available).getOrElse(true) => val current = committedOffsets.get(source) val batch = source.getBatch(current, available) logDebug(s"Retrieving data from $source: $current -> $available") @@ -404,10 +405,10 @@ class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is indented for use primarily when writing tests. */ - def awaitOffset(source: Source, newOffset: Offset): Unit = { + private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { def notDone = { val localCommittedOffsets = committedOffsets - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset + !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset } while (notDone) { |