diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 03c4137ca0..ee22c6656e 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -184,14 +184,16 @@ private[nio] class ConnectionManager( // to be able to track asynchronous messages private val idCount: AtomicInteger = new AtomicInteger(1) + private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]() + private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]() + private val selectorThread = new Thread("connection-manager-thread") { override def run() = ConnectionManager.this.run() } selectorThread.setDaemon(true) + // start this thread last, since it invokes run(), which accesses members above selectorThread.start() - private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]() - private def triggerWrite(key: SelectionKey) { val conn = connectionsByKey.getOrElse(key, null) if (conn == null) return @@ -232,7 +234,6 @@ private[nio] class ConnectionManager( } ) } - private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]() private def triggerRead(key: SelectionKey) { val conn = connectionsByKey.getOrElse(key, null) |