diff options
author | zsxwing <zsxwing@gmail.com> | 2015-04-02 22:54:30 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-04-02 22:54:30 -0700 |
commit | 440ea31b76aa7e813436271fd63880c7bcd69157 (patch) | |
tree | 363c3d7b7a87bd492999a366ff9b7bd509ab8222 /core | |
parent | 6e1c1ec67bc4d7e5700f523ec08db6bb25bd2302 (diff) | |
download | spark-440ea31b76aa7e813436271fd63880c7bcd69157.tar.gz spark-440ea31b76aa7e813436271fd63880c7bcd69157.tar.bz2 spark-440ea31b76aa7e813436271fd63880c7bcd69157.zip |
[SPARK-6621][Core] Fix the bug that calling EventLoop.stop in EventLoop.onReceive/onError/onStart doesn't call onStop
Author: zsxwing <zsxwing@gmail.com>
Closes #5280 from zsxwing/SPARK-6621 and squashes the following commits:
521125e [zsxwing] Fix the bug that calling EventLoop.stop in EventLoop.onReceive and EventLoop.onError doesn't call onStop
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/EventLoop.scala | 18 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala | 72 |
2 files changed, 87 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala index b0ed908b84..e9b2b8d24b 100644 --- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala +++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala @@ -76,9 +76,21 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging { def stop(): Unit = { if (stopped.compareAndSet(false, true)) { eventThread.interrupt() - eventThread.join() - // Call onStop after the event thread exits to make sure onReceive happens before onStop - onStop() + var onStopCalled = false + try { + eventThread.join() + // Call onStop after the event thread exits to make sure onReceive happens before onStop + onStopCalled = true + onStop() + } catch { + case ie: InterruptedException => + Thread.currentThread().interrupt() + if (!onStopCalled) { + // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since + // it's already called. + onStop() + } + } } else { // Keep quiet to allow calling `stop` multiple times. } diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala index 1026cb2aa7..47b535206c 100644 --- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala @@ -203,4 +203,76 @@ class EventLoopSuite extends FunSuite with Timeouts { assert(!eventLoop.isActive) } } + + test("EventLoop: stop() in onStart should call onStop") { + @volatile var onStopCalled: Boolean = false + val eventLoop = new EventLoop[Int]("test") { + + override def onStart(): Unit = { + stop() + } + + override def onReceive(event: Int): Unit = { + } + + override def onError(e: Throwable): Unit = { + } + + override def onStop(): Unit = { + onStopCalled = true + } + } + eventLoop.start() + eventually(timeout(5 seconds), interval(5 millis)) { + assert(!eventLoop.isActive) + } + assert(onStopCalled) + } + + test("EventLoop: stop() in onReceive should call onStop") { + @volatile var onStopCalled: Boolean = false + val eventLoop = new EventLoop[Int]("test") { + + override def onReceive(event: Int): Unit = { + stop() + } + + override def onError(e: Throwable): Unit = { + } + + override def onStop(): Unit = { + onStopCalled = true + } + } + eventLoop.start() + eventLoop.post(1) + eventually(timeout(5 seconds), interval(5 millis)) { + assert(!eventLoop.isActive) + } + assert(onStopCalled) + } + + test("EventLoop: stop() in onError should call onStop") { + @volatile var onStopCalled: Boolean = false + val eventLoop = new EventLoop[Int]("test") { + + override def onReceive(event: Int): Unit = { + throw new RuntimeException("Oops") + } + + override def onError(e: Throwable): Unit = { + stop() + } + + override def onStop(): Unit = { + onStopCalled = true + } + } + eventLoop.start() + eventLoop.post(1) + eventually(timeout(5 seconds), interval(5 millis)) { + assert(!eventLoop.isActive) + } + assert(onStopCalled) + } } |