aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala9
1 files changed, 5 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 220f77dc24..9825f19b86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -259,7 +259,7 @@ class StreamExecution(
case (source, available) =>
committedOffsets
.get(source)
- .map(committed => committed < available)
+ .map(committed => committed != available)
.getOrElse(true)
}
}
@@ -318,7 +318,8 @@ class StreamExecution(
// Request unprocessed data from all sources.
val newData = availableOffsets.flatMap {
- case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) =>
+ case (source, available)
+ if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(source)
val batch = source.getBatch(current, available)
logDebug(s"Retrieving data from $source: $current -> $available")
@@ -404,10 +405,10 @@ class StreamExecution(
* Blocks the current thread until processing for data from the given `source` has reached at
* least the given `Offset`. This method is indented for use primarily when writing tests.
*/
- def awaitOffset(source: Source, newOffset: Offset): Unit = {
+ private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
def notDone = {
val localCommittedOffsets = committedOffsets
- !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset
+ !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset
}
while (notDone) {