aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala72
1 files changed, 72 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
index 1026cb2aa7..47b535206c 100644
--- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
@@ -203,4 +203,76 @@ class EventLoopSuite extends FunSuite with Timeouts {
assert(!eventLoop.isActive)
}
}
+
+ test("EventLoop: stop() in onStart should call onStop") {
+ @volatile var onStopCalled: Boolean = false
+ val eventLoop = new EventLoop[Int]("test") {
+
+ override def onStart(): Unit = {
+ stop()
+ }
+
+ override def onReceive(event: Int): Unit = {
+ }
+
+ override def onError(e: Throwable): Unit = {
+ }
+
+ override def onStop(): Unit = {
+ onStopCalled = true
+ }
+ }
+ eventLoop.start()
+ eventually(timeout(5 seconds), interval(5 millis)) {
+ assert(!eventLoop.isActive)
+ }
+ assert(onStopCalled)
+ }
+
+ test("EventLoop: stop() in onReceive should call onStop") {
+ @volatile var onStopCalled: Boolean = false
+ val eventLoop = new EventLoop[Int]("test") {
+
+ override def onReceive(event: Int): Unit = {
+ stop()
+ }
+
+ override def onError(e: Throwable): Unit = {
+ }
+
+ override def onStop(): Unit = {
+ onStopCalled = true
+ }
+ }
+ eventLoop.start()
+ eventLoop.post(1)
+ eventually(timeout(5 seconds), interval(5 millis)) {
+ assert(!eventLoop.isActive)
+ }
+ assert(onStopCalled)
+ }
+
+ test("EventLoop: stop() in onError should call onStop") {
+ @volatile var onStopCalled: Boolean = false
+ val eventLoop = new EventLoop[Int]("test") {
+
+ override def onReceive(event: Int): Unit = {
+ throw new RuntimeException("Oops")
+ }
+
+ override def onError(e: Throwable): Unit = {
+ stop()
+ }
+
+ override def onStop(): Unit = {
+ onStopCalled = true
+ }
+ }
+ eventLoop.start()
+ eventLoop.post(1)
+ eventually(timeout(5 seconds), interval(5 millis)) {
+ assert(!eventLoop.isActive)
+ }
+ assert(onStopCalled)
+ }
}