diff options
author | zsxwing <zsxwing@gmail.com> | 2015-04-02 22:54:30 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-04-02 22:55:01 -0700 |
commit | ac705aa83632622aecd641f7c8619cd78a3cad74 (patch) | |
tree | a85c43a7068301e3af8d88bc1b36940c30ebda0f | |
parent | d21f779887c5af8cab76809e2dfc9dd5bcc42eab (diff) | |
download | spark-ac705aa83632622aecd641f7c8619cd78a3cad74.tar.gz spark-ac705aa83632622aecd641f7c8619cd78a3cad74.tar.bz2 spark-ac705aa83632622aecd641f7c8619cd78a3cad74.zip |
[SPARK-6621][Core] Fix the bug that calling EventLoop.stop in EventLoop.onReceive/onError/onStart doesn't call onStop
Author: zsxwing <zsxwing@gmail.com>
Closes #5280 from zsxwing/SPARK-6621 and squashes the following commits:
521125e [zsxwing] Fix the bug that calling EventLoop.stop in EventLoop.onReceive and EventLoop.onError doesn't call onStop
(cherry picked from commit 440ea31b76aa7e813436271fd63880c7bcd69157)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/EventLoop.scala | 18 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala | 72 |
2 files changed, 87 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala index b0ed908b84..e9b2b8d24b 100644 --- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala +++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala @@ -76,9 +76,21 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging { def stop(): Unit = { if (stopped.compareAndSet(false, true)) { eventThread.interrupt() - eventThread.join() - // Call onStop after the event thread exits to make sure onReceive happens before onStop - onStop() + var onStopCalled = false + try { + eventThread.join() + // Call onStop after the event thread exits to make sure onReceive happens before onStop + onStopCalled = true + onStop() + } catch { + case ie: InterruptedException => + Thread.currentThread().interrupt() + if (!onStopCalled) { + // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since + // it's already called. + onStop() + } + } } else { // Keep quiet to allow calling `stop` multiple times. } diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala index 1026cb2aa7..47b535206c 100644 --- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala @@ -203,4 +203,76 @@ class EventLoopSuite extends FunSuite with Timeouts { assert(!eventLoop.isActive) } } + + test("EventLoop: stop() in onStart should call onStop") { + @volatile var onStopCalled: Boolean = false + val eventLoop = new EventLoop[Int]("test") { + + override def onStart(): Unit = { + stop() + } + + override def onReceive(event: Int): Unit = { + } + + override def onError(e: Throwable): Unit = { + } + + override def onStop(): Unit = { + onStopCalled = true + } + } + eventLoop.start() + eventually(timeout(5 seconds), interval(5 millis)) { + assert(!eventLoop.isActive) + } + assert(onStopCalled) + } + + test("EventLoop: stop() in onReceive should call onStop") { + @volatile var onStopCalled: Boolean = false + val eventLoop = new EventLoop[Int]("test") { + + override def onReceive(event: Int): Unit = { + stop() + } + + override def onError(e: Throwable): Unit = { + } + + override def onStop(): Unit = { + onStopCalled = true + } + } + eventLoop.start() + eventLoop.post(1) + eventually(timeout(5 seconds), interval(5 millis)) { + assert(!eventLoop.isActive) + } + assert(onStopCalled) + } + + test("EventLoop: stop() in onError should call onStop") { + @volatile var onStopCalled: Boolean = false + val eventLoop = new EventLoop[Int]("test") { + + override def onReceive(event: Int): Unit = { + throw new RuntimeException("Oops") + } + + override def onError(e: Throwable): Unit = { + stop() + } + + override def onStop(): Unit = { + onStopCalled = true + } + } + eventLoop.start() + eventLoop.post(1) + eventually(timeout(5 seconds), interval(5 millis)) { + assert(!eventLoop.isActive) + } + assert(onStopCalled) + } } |