diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2015-12-07 00:21:55 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-12-07 00:21:55 -0800 |
commit | 6fd9e70e3ed43836a0685507fff9949f921234f4 (patch) | |
tree | 213977d3a3ffa152071618e02157d050fcaa965c /streaming/src/test/scala | |
parent | 80a824d36eec9d9a9f092ee1741453851218ec73 (diff) | |
download | spark-6fd9e70e3ed43836a0685507fff9949f921234f4.tar.gz spark-6fd9e70e3ed43836a0685507fff9949f921234f4.tar.bz2 spark-6fd9e70e3ed43836a0685507fff9949f921234f4.zip |
[SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flaky when Jenkins load is high
We need to make sure that the last entry is indeed the last entry in the queue.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #10110 from brkyvz/batch-wal-test-fix.
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index eaa88ea3cd..ef1e89df31 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -480,7 +480,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( p } - test("BatchedWriteAheadLog - name log with aggregated entries with the timestamp of last entry") { + test("BatchedWriteAheadLog - name log with the highest timestamp of aggregated entries") { val blockingWal = new BlockingWriteAheadLog(wal, walHandle) val batchedWal = new BatchedWriteAheadLog(blockingWal, sparkConf) @@ -500,8 +500,14 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // rest of the records will be batched while it takes time for 3 to get written writeAsync(batchedWal, event2, 5L) writeAsync(batchedWal, event3, 8L) - writeAsync(batchedWal, event4, 12L) - writeAsync(batchedWal, event5, 10L) + // we would like event 5 to be written before event 4 in order to test that they get + // sorted before being aggregated + writeAsync(batchedWal, event5, 12L) + eventually(timeout(1 second)) { + assert(blockingWal.isBlocked) + assert(batchedWal.invokePrivate(queueLength()) === 3) + } + writeAsync(batchedWal, event4, 10L) eventually(timeout(1 second)) { assert(walBatchingThreadPool.getActiveCount === 5) assert(batchedWal.invokePrivate(queueLength()) === 4) @@ -517,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // the file name should be the timestamp of the last record, as events should be naturally // in order of timestamp, and we need the last element. val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer]) - verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L)) + verify(wal, times(1)).write(bufferCaptor.capture(), meq(12L)) val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString) assert(records.toSet === queuedEvents) } |