aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala19
1 files changed, 13 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index adf6963577..b370845481 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -186,16 +186,23 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
}.mkString("\n")
}
- override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
- if (latestBatchId.isEmpty || batchId > latestBatchId.get) {
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {
+ val notCommitted = synchronized {
+ latestBatchId.isEmpty || batchId > latestBatchId.get
+ }
+ if (notCommitted) {
logDebug(s"Committing batch $batchId to $this")
outputMode match {
case InternalOutputModes.Append | InternalOutputModes.Update =>
- batches.append(AddedData(batchId, data.collect()))
+ val rows = AddedData(batchId, data.collect())
+ synchronized { batches += rows }
case InternalOutputModes.Complete =>
- batches.clear()
- batches += AddedData(batchId, data.collect())
+ val rows = AddedData(batchId, data.collect())
+ synchronized {
+ batches.clear()
+ batches += rows
+ }
case _ =>
throw new IllegalArgumentException(
@@ -206,7 +213,7 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
}
}
- def clear(): Unit = {
+ def clear(): Unit = synchronized {
batches.clear()
}