diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2015-04-18 10:14:56 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-04-18 10:14:56 +0100 |
commit | 327ebf0cb5e236579bece057eda27b21aed0e2dc (patch) | |
tree | 26d0313d3d96bb2d6304e9d214241ee36ed237b2 | |
parent | 5f095d56054d57c54d81db1d36cd46312810fb6a (diff) | |
download | spark-327ebf0cb5e236579bece057eda27b21aed0e2dc.tar.gz spark-327ebf0cb5e236579bece057eda27b21aed0e2dc.tar.bz2 spark-327ebf0cb5e236579bece057eda27b21aed0e2dc.zip |
[core] [minor] Make sure ConnectionManager stops.
My previous fix (force a selector wakeup) didn't seem to work since
I ran into the hang again. So change the code a bit to be more
explicit about the condition when the selector thread should exit.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #5566 from vanzin/conn-mgr-hang and squashes the following commits:
ddb2c03 [Marcelo Vanzin] [core] [minor] Make sure ConnectionManager stops.
-rw-r--r-- | core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 5a74c13b38..1a68e621ea 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -188,6 +188,7 @@ private[nio] class ConnectionManager( private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]() private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]() + @volatile private var isActive = true private val selectorThread = new Thread("connection-manager-thread") { override def run(): Unit = ConnectionManager.this.run() } @@ -342,7 +343,7 @@ private[nio] class ConnectionManager( def run() { try { - while(!selectorThread.isInterrupted) { + while (isActive) { while (!registerRequests.isEmpty) { val conn: SendingConnection = registerRequests.dequeue() addListeners(conn) @@ -398,7 +399,7 @@ private[nio] class ConnectionManager( } catch { // Explicitly only dealing with CancelledKeyException here since other exceptions // should be dealt with differently. - case e: CancelledKeyException => { + case e: CancelledKeyException => // Some keys within the selectors list are invalid/closed. clear them. val allKeys = selector.keys().iterator() @@ -420,8 +421,11 @@ private[nio] class ConnectionManager( } } } - } - 0 + 0 + + case e: ClosedSelectorException => + logDebug("Failed select() as selector is closed.", e) + return } if (selectedKeysCount == 0) { @@ -988,11 +992,11 @@ private[nio] class ConnectionManager( } def stop() { + isActive = false ackTimeoutMonitor.stop() - selector.wakeup() + selector.close() selectorThread.interrupt() selectorThread.join() - selector.close() val connections = connectionsByKey.values connections.foreach(_.close()) if (connectionsByKey.size != 0) { |