aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-02 22:54:30 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-02 22:55:01 -0700
commitac705aa83632622aecd641f7c8619cd78a3cad74 (patch)
treea85c43a7068301e3af8d88bc1b36940c30ebda0f
parentd21f779887c5af8cab76809e2dfc9dd5bcc42eab (diff)
downloadspark-ac705aa83632622aecd641f7c8619cd78a3cad74.tar.gz
spark-ac705aa83632622aecd641f7c8619cd78a3cad74.tar.bz2
spark-ac705aa83632622aecd641f7c8619cd78a3cad74.zip
[SPARK-6621][Core] Fix the bug that calling EventLoop.stop in EventLoop.onReceive/onError/onStart doesn't call onStop
Author: zsxwing <zsxwing@gmail.com> Closes #5280 from zsxwing/SPARK-6621 and squashes the following commits: 521125e [zsxwing] Fix the bug that calling EventLoop.stop in EventLoop.onReceive and EventLoop.onError doesn't call onStop (cherry picked from commit 440ea31b76aa7e813436271fd63880c7bcd69157) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
-rw-r--r--core/src/main/scala/org/apache/spark/util/EventLoop.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala72
2 files changed, 87 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
index b0ed908b84..e9b2b8d24b 100644
--- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala
+++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
@@ -76,9 +76,21 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging {
def stop(): Unit = {
if (stopped.compareAndSet(false, true)) {
eventThread.interrupt()
- eventThread.join()
- // Call onStop after the event thread exits to make sure onReceive happens before onStop
- onStop()
+ var onStopCalled = false
+ try {
+ eventThread.join()
+ // Call onStop after the event thread exits to make sure onReceive happens before onStop
+ onStopCalled = true
+ onStop()
+ } catch {
+ case ie: InterruptedException =>
+ Thread.currentThread().interrupt()
+ if (!onStopCalled) {
+ // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
+ // it's already called.
+ onStop()
+ }
+ }
} else {
// Keep quiet to allow calling `stop` multiple times.
}
diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
index 1026cb2aa7..47b535206c 100644
--- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
@@ -203,4 +203,76 @@ class EventLoopSuite extends FunSuite with Timeouts {
assert(!eventLoop.isActive)
}
}
+
+ test("EventLoop: stop() in onStart should call onStop") {
+ @volatile var onStopCalled: Boolean = false
+ val eventLoop = new EventLoop[Int]("test") {
+
+ override def onStart(): Unit = {
+ stop()
+ }
+
+ override def onReceive(event: Int): Unit = {
+ }
+
+ override def onError(e: Throwable): Unit = {
+ }
+
+ override def onStop(): Unit = {
+ onStopCalled = true
+ }
+ }
+ eventLoop.start()
+ eventually(timeout(5 seconds), interval(5 millis)) {
+ assert(!eventLoop.isActive)
+ }
+ assert(onStopCalled)
+ }
+
+ test("EventLoop: stop() in onReceive should call onStop") {
+ @volatile var onStopCalled: Boolean = false
+ val eventLoop = new EventLoop[Int]("test") {
+
+ override def onReceive(event: Int): Unit = {
+ stop()
+ }
+
+ override def onError(e: Throwable): Unit = {
+ }
+
+ override def onStop(): Unit = {
+ onStopCalled = true
+ }
+ }
+ eventLoop.start()
+ eventLoop.post(1)
+ eventually(timeout(5 seconds), interval(5 millis)) {
+ assert(!eventLoop.isActive)
+ }
+ assert(onStopCalled)
+ }
+
+ test("EventLoop: stop() in onError should call onStop") {
+ @volatile var onStopCalled: Boolean = false
+ val eventLoop = new EventLoop[Int]("test") {
+
+ override def onReceive(event: Int): Unit = {
+ throw new RuntimeException("Oops")
+ }
+
+ override def onError(e: Throwable): Unit = {
+ stop()
+ }
+
+ override def onStop(): Unit = {
+ onStopCalled = true
+ }
+ }
+ eventLoop.start()
+ eventLoop.post(1)
+ eventually(timeout(5 seconds), interval(5 millis)) {
+ assert(!eventLoop.isActive)
+ }
+ assert(onStopCalled)
+ }
}