diff options
author | zsxwing <zsxwing@gmail.com> | 2015-01-24 11:00:35 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-01-24 11:00:35 -0800 |
commit | 0d1e67ee9b29b51bccfc8a319afe9f9b4581afc8 (patch) | |
tree | 653ba72c34f1f20c5a3451c8a62c15b9a7c23aaa /core/src | |
parent | 09e09c548e7722fca1cdc89bd37de2cee58f4ce9 (diff) | |
download | spark-0d1e67ee9b29b51bccfc8a319afe9f9b4581afc8.tar.gz spark-0d1e67ee9b29b51bccfc8a319afe9f9b4581afc8.tar.bz2 spark-0d1e67ee9b29b51bccfc8a319afe9f9b4581afc8.zip |
[SPARK-5214][Test] Add a test to demonstrate EventLoop can be stopped in the event thread
Author: zsxwing <zsxwing@gmail.com>
Closes #4174 from zsxwing/SPARK-5214-unittest and squashes the following commits:
443e564 [zsxwing] Change the check interval to 5ms
7aaa2d7 [zsxwing] Add a test to demonstrate EventLoop can be stopped in the event thread
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala index 10541f8784..1026cb2aa7 100644 --- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala @@ -41,7 +41,7 @@ class EventLoopSuite extends FunSuite with Timeouts { } eventLoop.start() (1 to 100).foreach(eventLoop.post) - eventually(timeout(5 seconds), interval(200 millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert((1 to 100) === buffer.toSeq) } eventLoop.stop() @@ -76,7 +76,7 @@ class EventLoopSuite extends FunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5 seconds), interval(200 millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(e === receivedError) } eventLoop.stop() @@ -98,7 +98,7 @@ class EventLoopSuite extends FunSuite with Timeouts { } eventLoop.start() eventLoop.post(1) - eventually(timeout(5 seconds), interval(200 millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(e === receivedError) assert(eventLoop.isActive) } @@ -153,7 +153,7 @@ class EventLoopSuite extends FunSuite with Timeouts { }.start() } - eventually(timeout(5 seconds), interval(200 millis)) { + eventually(timeout(5 seconds), interval(5 millis)) { assert(threadNum * eventsFromEachThread === receivedEventsCount) } eventLoop.stop() @@ -185,4 +185,22 @@ class EventLoopSuite extends FunSuite with Timeouts { } assert(false === eventLoop.isActive) } + + test("EventLoop: stop in eventThread") { + val eventLoop = new EventLoop[Int]("test") { + + override def onReceive(event: Int): Unit = { + stop() + } + + override def onError(e: Throwable): Unit = { + } + + } + eventLoop.start() + eventLoop.post(1) + eventually(timeout(5 seconds), interval(5 millis)) { + assert(!eventLoop.isActive) + } + } } |