diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2015-12-07 00:21:55 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-12-07 00:21:55 -0800 |
commit | 6fd9e70e3ed43836a0685507fff9949f921234f4 (patch) | |
tree | 213977d3a3ffa152071618e02157d050fcaa965c /streaming/src/main/scala | |
parent | 80a824d36eec9d9a9f092ee1741453851218ec73 (diff) | |
download | spark-6fd9e70e3ed43836a0685507fff9949f921234f4.tar.gz spark-6fd9e70e3ed43836a0685507fff9949f921234f4.tar.bz2 spark-6fd9e70e3ed43836a0685507fff9949f921234f4.zip |
[SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flaky when Jenkins load is high
We need to make sure that the last entry is indeed the last entry in the queue.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #10110 from brkyvz/batch-wal-test-fix.
Diffstat (limited to 'streaming/src/main/scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala index 7158abc088..b2cd524f28 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala @@ -166,10 +166,12 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp var segment: WriteAheadLogRecordHandle = null if (buffer.length > 0) { logDebug(s"Batched ${buffer.length} records for Write Ahead Log write") + // threads may not be able to add items in order by time + val sortedByTime = buffer.sortBy(_.time) // We take the latest record for the timestamp. Please refer to the class Javadoc for // detailed explanation - val time = buffer.last.time - segment = wrappedLog.write(aggregate(buffer), time) + val time = sortedByTime.last.time + segment = wrappedLog.write(aggregate(sortedByTime), time) } buffer.foreach(_.promise.success(segment)) } catch { |