diff options
author | Ilya Ganelin <ilya.ganelin@capitalone.com> | 2014-12-18 12:53:18 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-12-18 12:53:18 -0800 |
commit | 3720057b8e7c15c2c0464b5bb7243bc22323f4e8 (patch) | |
tree | 1f3e3543c0e3e9c35e9bebe58b6f125fe9e02f03 | |
parent | d9956f86ad7a937c5f2cfe39eacdcbdad9356c30 (diff) | |
download | spark-3720057b8e7c15c2c0464b5bb7243bc22323f4e8.tar.gz spark-3720057b8e7c15c2c0464b5bb7243bc22323f4e8.tar.bz2 spark-3720057b8e7c15c2c0464b5bb7243bc22323f4e8.zip |
[SPARK-3607] ConnectionManager threads.max configs on the thread pools don't work
Hi all - cleaned up the code to get rid of the unused parameter and added some discussion of the ThreadPoolExecutor parameters to explain why we can use a single threadCount instead of providing a min/max.
Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Closes #3664 from ilganeli/SPARK-3607C and squashes the following commits:
3c05690 [Ilya Ganelin] Updated documentation and refactored code to extract shared variables
-rw-r--r-- | core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index df4b085d22..243b71c980 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -83,9 +83,21 @@ private[nio] class ConnectionManager( private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60) + // Get the thread counts from the Spark Configuration. + // + // Even though the ThreadPoolExecutor constructor takes both a minimum and maximum value, + // we only query for the minimum value because we are using LinkedBlockingDeque. + // + // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is + // an unbounded queue) no more than corePoolSize threads will ever be created, so only the "min" + // parameter is necessary. + private val handlerThreadCount = conf.getInt("spark.core.connection.handler.threads.min", 20) + private val ioThreadCount = conf.getInt("spark.core.connection.io.threads.min", 4) + private val connectThreadCount = conf.getInt("spark.core.connection.connect.threads.min", 1) + private val handleMessageExecutor = new ThreadPoolExecutor( - conf.getInt("spark.core.connection.handler.threads.min", 20), - conf.getInt("spark.core.connection.handler.threads.max", 60), + handlerThreadCount, + handlerThreadCount, conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable](), Utils.namedThreadFactory("handle-message-executor")) { @@ -96,12 +108,11 @@ private[nio] class ConnectionManager( logError("Error in handleMessageExecutor is not handled properly", t) } } - } private val handleReadWriteExecutor = new ThreadPoolExecutor( - conf.getInt("spark.core.connection.io.threads.min", 4), - conf.getInt("spark.core.connection.io.threads.max", 32), + ioThreadCount, + ioThreadCount, conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable](), Utils.namedThreadFactory("handle-read-write-executor")) { @@ -112,14 +123,13 @@ private[nio] class ConnectionManager( logError("Error in handleReadWriteExecutor is not handled properly", t) } } - } // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : // which should be executed asap private val handleConnectExecutor = new ThreadPoolExecutor( - conf.getInt("spark.core.connection.connect.threads.min", 1), - conf.getInt("spark.core.connection.connect.threads.max", 8), + connectThreadCount, + connectThreadCount, conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable](), Utils.namedThreadFactory("handle-connect-executor")) { @@ -130,7 +140,6 @@ private[nio] class ConnectionManager( logError("Error in handleConnectExecutor is not handled properly", t) } } - } private val serverChannel = ServerSocketChannel.open() |