aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-11-18 16:19:00 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-18 16:19:00 -0800
commit921900fd06362474f8caac675803d526a0986d70 (patch)
treea87dffff24d98fda06c7fe74ba0502fdbe05c999 /streaming
parenta402c92c92b2e1c85d264f6077aec8f6d6a08270 (diff)
downloadspark-921900fd06362474f8caac675803d526a0986d70.tar.gz
spark-921900fd06362474f8caac675803d526a0986d70.tar.bz2
spark-921900fd06362474f8caac675803d526a0986d70.zip
[SPARK-11791] Fix flaky test in BatchedWriteAheadLogSuite
stack trace of failure: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 62 times over 1.006322071 seconds. Last failure message: Argument(s) are different! Wanted: writeAheadLog.write( java.nio.HeapByteBuffer[pos=0 lim=124 cap=124], 10 ); -> at org.apache.spark.streaming.util.BatchedWriteAheadLogSuite$$anonfun$23$$anonfun$apply$mcV$sp$15.apply(WriteAheadLogSuite.scala:518) Actual invocation has different arguments: writeAheadLog.write( java.nio.HeapByteBuffer[pos=0 lim=124 cap=124], 10 ); -> at org.apache.spark.streaming.util.WriteAheadLogSuite$BlockingWriteAheadLog.write(WriteAheadLogSuite.scala:756) ``` I believe the issue was that due to a race condition, the ordering of the events could be messed up in the final ByteBuffer, therefore the comparison fails. By adding eventually between the requests, we make sure the ordering is preserved. Note that in real life situations, the ordering across threads will not matter. Another solution would be to implement a custom mockito matcher that sorts and then compares the results, but that kind of sounds like overkill to me. Let me know what you think tdas zsxwing Author: Burak Yavuz <brkyvz@gmail.com> Closes #9790 from brkyvz/fix-flaky-2.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 7f80d6ecdb..eaa88ea3cd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -30,6 +30,7 @@ import scala.language.{implicitConversions, postfixOps}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.mockito.ArgumentCaptor
import org.mockito.Matchers.{eq => meq}
import org.mockito.Matchers._
import org.mockito.Mockito._
@@ -507,15 +508,18 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
}
blockingWal.allowWrite()
- val buffer1 = wrapArrayArrayByte(Array(event1))
- val buffer2 = wrapArrayArrayByte(Array(event2, event3, event4, event5))
+ val buffer = wrapArrayArrayByte(Array(event1))
+ val queuedEvents = Set(event2, event3, event4, event5)
eventually(timeout(1 second)) {
assert(batchedWal.invokePrivate(queueLength()) === 0)
- verify(wal, times(1)).write(meq(buffer1), meq(3L))
+ verify(wal, times(1)).write(meq(buffer), meq(3L))
// the file name should be the timestamp of the last record, as events should be naturally
// in order of timestamp, and we need the last element.
- verify(wal, times(1)).write(meq(buffer2), meq(10L))
+ val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
+ verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
+ val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
+ assert(records.toSet === queuedEvents)
}
}