aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-18 10:14:56 +0100
committerSean Owen <sowen@cloudera.com>2015-04-18 10:14:56 +0100
commit327ebf0cb5e236579bece057eda27b21aed0e2dc (patch)
tree26d0313d3d96bb2d6304e9d214241ee36ed237b2 /core
parent5f095d56054d57c54d81db1d36cd46312810fb6a (diff)
downloadspark-327ebf0cb5e236579bece057eda27b21aed0e2dc.tar.gz
spark-327ebf0cb5e236579bece057eda27b21aed0e2dc.tar.bz2
spark-327ebf0cb5e236579bece057eda27b21aed0e2dc.zip
[core] [minor] Make sure ConnectionManager stops.
My previous fix (force a selector wakeup) didn't seem to work since I ran into the hang again. So change the code a bit to be more explicit about the condition when the selector thread should exit. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5566 from vanzin/conn-mgr-hang and squashes the following commits: ddb2c03 [Marcelo Vanzin] [core] [minor] Make sure ConnectionManager stops.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala16
1 files changed, 10 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 5a74c13b38..1a68e621ea 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -188,6 +188,7 @@ private[nio] class ConnectionManager(
private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
+ @volatile private var isActive = true
private val selectorThread = new Thread("connection-manager-thread") {
override def run(): Unit = ConnectionManager.this.run()
}
@@ -342,7 +343,7 @@ private[nio] class ConnectionManager(
def run() {
try {
- while(!selectorThread.isInterrupted) {
+ while (isActive) {
while (!registerRequests.isEmpty) {
val conn: SendingConnection = registerRequests.dequeue()
addListeners(conn)
@@ -398,7 +399,7 @@ private[nio] class ConnectionManager(
} catch {
// Explicitly only dealing with CancelledKeyException here since other exceptions
// should be dealt with differently.
- case e: CancelledKeyException => {
+ case e: CancelledKeyException =>
// Some keys within the selectors list are invalid/closed. clear them.
val allKeys = selector.keys().iterator()
@@ -420,8 +421,11 @@ private[nio] class ConnectionManager(
}
}
}
- }
- 0
+ 0
+
+ case e: ClosedSelectorException =>
+ logDebug("Failed select() as selector is closed.", e)
+ return
}
if (selectedKeysCount == 0) {
@@ -988,11 +992,11 @@ private[nio] class ConnectionManager(
}
def stop() {
+ isActive = false
ackTimeoutMonitor.stop()
- selector.wakeup()
+ selector.close()
selectorThread.interrupt()
selectorThread.join()
- selector.close()
val connections = connectionsByKey.values
connections.foreach(_.close())
if (connectionsByKey.size != 0) {