aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-26 15:00:32 -0700
committerMatei Zaharia <matei@databricks.com>2014-07-26 15:00:32 -0700
commit12901643b7e808aa75cf0b19e2d0c3d40b1a978d (patch)
treefa7bf5b62029f01e9ec2ea9051323c49ca00d08e /core/src/main/scala
parentc183b92c3c70ad2d36a2d60bdb10c02b65bc0212 (diff)
downloadspark-12901643b7e808aa75cf0b19e2d0c3d40b1a978d.tar.gz
spark-12901643b7e808aa75cf0b19e2d0c3d40b1a978d.tar.bz2
spark-12901643b7e808aa75cf0b19e2d0c3d40b1a978d.zip
[SPARK-2704] Name threads in ConnectionManager and mark them as daemon.
handleMessageExecutor, handleReadWriteExecutor, and handleConnectExecutor are not marked as daemon and not named. I think there exists some condition in which Spark programs won't terminate because of this. Stack dump attached in https://issues.apache.org/jira/browse/SPARK-2704 Author: Reynold Xin <rxin@apache.org> Closes #1604 from rxin/daemon and squashes the following commits: 98d6a6c [Reynold Xin] [SPARK-2704] Name threads in ConnectionManager and mark them as daemon.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala27
2 files changed, 23 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 8a1cdb8129..566e8a4aaa 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -62,13 +62,15 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
conf.getInt("spark.core.connection.handler.threads.min", 20),
conf.getInt("spark.core.connection.handler.threads.max", 60),
conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable]())
+ new LinkedBlockingDeque[Runnable](),
+ Utils.namedThreadFactory("handle-message-executor"))
private val handleReadWriteExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.io.threads.min", 4),
conf.getInt("spark.core.connection.io.threads.max", 32),
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable]())
+ new LinkedBlockingDeque[Runnable](),
+ Utils.namedThreadFactory("handle-read-write-executor"))
// Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
// which should be executed asap
@@ -76,7 +78,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
conf.getInt("spark.core.connection.connect.threads.min", 1),
conf.getInt("spark.core.connection.connect.threads.max", 8),
conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable]())
+ new LinkedBlockingDeque[Runnable](),
+ Utils.namedThreadFactory("handle-connect-executor"))
private val serverChannel = ServerSocketChannel.open()
// used to track the SendingConnections waiting to do SASL negotiation
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1a4f4eba98..8cbb9050f3 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection}
import java.nio.ByteBuffer
import java.util.{Locale, Random, UUID}
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.JavaConversions._
import scala.collection.Map
@@ -553,19 +553,19 @@ private[spark] object Utils extends Logging {
new ThreadFactoryBuilder().setDaemon(true)
/**
- * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
- * unique, sequentially assigned integer.
+ * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
*/
- def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
- val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
- Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+ def namedThreadFactory(prefix: String): ThreadFactory = {
+ daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
}
/**
- * Return the string to tell how long has passed in milliseconds.
+ * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+ * unique, sequentially assigned integer.
*/
- def getUsedTimeMs(startTimeMs: Long): String = {
- " " + (System.currentTimeMillis - startTimeMs) + " ms"
+ def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+ val threadFactory = namedThreadFactory(prefix)
+ Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
/**
@@ -573,10 +573,17 @@ private[spark] object Utils extends Logging {
* unique, sequentially assigned integer.
*/
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
- val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
+ val threadFactory = namedThreadFactory(prefix)
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
}
+ /**
+ * Return the string to tell how long has passed in milliseconds.
+ */
+ def getUsedTimeMs(startTimeMs: Long): String = {
+ " " + (System.currentTimeMillis - startTimeMs) + " ms"
+ }
+
private def listFilesSafely(file: File): Seq[File] = {
val files = file.listFiles()
if (files == null) {