aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-01-24 11:00:35 -0800
committerReynold Xin <rxin@databricks.com>2015-01-24 11:00:35 -0800
commit0d1e67ee9b29b51bccfc8a319afe9f9b4581afc8 (patch)
tree653ba72c34f1f20c5a3451c8a62c15b9a7c23aaa
parent09e09c548e7722fca1cdc89bd37de2cee58f4ce9 (diff)
downloadspark-0d1e67ee9b29b51bccfc8a319afe9f9b4581afc8.tar.gz
spark-0d1e67ee9b29b51bccfc8a319afe9f9b4581afc8.tar.bz2
spark-0d1e67ee9b29b51bccfc8a319afe9f9b4581afc8.zip
[SPARK-5214][Test] Add a test to demonstrate EventLoop can be stopped in the event thread
Author: zsxwing <zsxwing@gmail.com> Closes #4174 from zsxwing/SPARK-5214-unittest and squashes the following commits: 443e564 [zsxwing] Change the check interval to 5ms 7aaa2d7 [zsxwing] Add a test to demonstrate EventLoop can be stopped in the event thread
-rw-r--r--core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala26
1 files changed, 22 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
index 10541f8784..1026cb2aa7 100644
--- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
@@ -41,7 +41,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
}
eventLoop.start()
(1 to 100).foreach(eventLoop.post)
- eventually(timeout(5 seconds), interval(200 millis)) {
+ eventually(timeout(5 seconds), interval(5 millis)) {
assert((1 to 100) === buffer.toSeq)
}
eventLoop.stop()
@@ -76,7 +76,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
}
eventLoop.start()
eventLoop.post(1)
- eventually(timeout(5 seconds), interval(200 millis)) {
+ eventually(timeout(5 seconds), interval(5 millis)) {
assert(e === receivedError)
}
eventLoop.stop()
@@ -98,7 +98,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
}
eventLoop.start()
eventLoop.post(1)
- eventually(timeout(5 seconds), interval(200 millis)) {
+ eventually(timeout(5 seconds), interval(5 millis)) {
assert(e === receivedError)
assert(eventLoop.isActive)
}
@@ -153,7 +153,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
}.start()
}
- eventually(timeout(5 seconds), interval(200 millis)) {
+ eventually(timeout(5 seconds), interval(5 millis)) {
assert(threadNum * eventsFromEachThread === receivedEventsCount)
}
eventLoop.stop()
@@ -185,4 +185,22 @@ class EventLoopSuite extends FunSuite with Timeouts {
}
assert(false === eventLoop.isActive)
}
+
+ test("EventLoop: stop in eventThread") {
+ val eventLoop = new EventLoop[Int]("test") {
+
+ override def onReceive(event: Int): Unit = {
+ stop()
+ }
+
+ override def onError(e: Throwable): Unit = {
+ }
+
+ }
+ eventLoop.start()
+ eventLoop.post(1)
+ eventually(timeout(5 seconds), interval(5 millis)) {
+ assert(!eventLoop.isActive)
+ }
+ }
}