aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-10-15 14:20:27 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2013-10-15 14:23:43 -0700
commit707ad8cc4fe9bd65c5bd20b251efbdbe1d00c1ba (patch)
tree653d143c6588823cb99c6176e6b2ca234d13ddf9 /core
parent3249e0e90dd9a7b422f561c42407b6a2b3feab17 (diff)
downloadspark-707ad8cc4fe9bd65c5bd20b251efbdbe1d00c1ba.tar.gz
spark-707ad8cc4fe9bd65c5bd20b251efbdbe1d00c1ba.tar.bz2
spark-707ad8cc4fe9bd65c5bd20b251efbdbe1d00c1ba.zip
Unified daemon thread pools
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala22
7 files changed, 29 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
index b6c484bfe1..5332510e87 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
@@ -326,7 +326,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
private var blocksInRequestBitVector = new BitSet(totalBlocks)
override def run() {
- var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
+ var threadPool = Utils.newDaemonFixedThreadPool(
+ MultiTracker.MaxChatSlots, "Bit Torrent Chatter")
while (hasBlocks.get < totalBlocks) {
var numThreadsToCreate = 0
@@ -736,7 +737,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
private var setOfCompletedSources = Set[SourceInfo]()
override def run() {
- var threadPool = Utils.newDaemonCachedThreadPool()
+ var threadPool = Utils.newDaemonCachedThreadPool("Bit torrent guide multiple requests")
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(0)
@@ -927,7 +928,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
class ServeMultipleRequests
extends Thread with Logging {
// Server at most MultiTracker.MaxChatSlots peers
- var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
+ var threadPool = Utils.newDaemonFixedThreadPool(
+ MultiTracker.MaxChatSlots, "Bit torrent serve multiple requests")
override def run() {
var serverSocket = new ServerSocket(0)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
index 21ec94659e..82ed64f190 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
@@ -137,7 +137,7 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
override def run() {
- var threadPool = Utils.newDaemonCachedThreadPool()
+ var threadPool = Utils.newDaemonCachedThreadPool("Track multiple values")
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(DriverTrackerPort)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
index e6674d49a7..51af80a35e 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
@@ -291,7 +291,7 @@ extends Broadcast[T](id) with Logging with Serializable {
private var setOfCompletedSources = Set[SourceInfo]()
override def run() {
- var threadPool = Utils.newDaemonCachedThreadPool()
+ var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast guide multiple requests")
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(0)
@@ -493,7 +493,7 @@ extends Broadcast[T](id) with Logging with Serializable {
class ServeMultipleRequests
extends Thread with Logging {
- var threadPool = Utils.newDaemonCachedThreadPool()
+ var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast serve multiple requests")
override def run() {
var serverSocket = new ServerSocket(0)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 20323ea038..032eb04f43 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -121,8 +121,7 @@ private[spark] class Executor(
}
// Start worker thread pool
- val threadPool = new ThreadPoolExecutor(
- 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory)
+ val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index e15a839c4e..9c2fee4023 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -79,7 +79,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
private val registerRequests = new SynchronizedQueue[SendingConnection]
- implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool())
+ implicit val futureExecContext = ExecutionContext.fromExecutor(
+ Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
index feec8ecfe4..4312c46cc1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
@@ -24,33 +24,16 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
import org.apache.spark.serializer.SerializerInstance
+import org.apache.spark.util.Utils
/**
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
*/
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
extends Logging {
- private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt
- private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt
- private val getTaskResultExecutor = new ThreadPoolExecutor(
- MIN_THREADS,
- MAX_THREADS,
- 0L,
- TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable],
- new ResultResolverThreadFactory)
-
- class ResultResolverThreadFactory extends ThreadFactory {
- private var counter = 0
- private var PREFIX = "Result resolver thread"
-
- override def newThread(r: Runnable): Thread = {
- val thread = new Thread(r, "%s-%s".format(PREFIX, counter))
- counter += 1
- thread.setDaemon(true)
- return thread
- }
- }
+ private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
+ private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
+ THREADS, "Result resolver thread")
protected val serializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f384875cc9..a3b3968c5e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -447,14 +447,17 @@ private[spark] object Utils extends Logging {
hostPortParseResults.get(hostPort)
}
- private[spark] val daemonThreadFactory: ThreadFactory =
- new ThreadFactoryBuilder().setDaemon(true).build()
+ private val daemonThreadFactoryBuilder: ThreadFactoryBuilder =
+ new ThreadFactoryBuilder().setDaemon(true)
/**
- * Wrapper over newCachedThreadPool.
+ * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+ * unique, sequentially assigned integer.
*/
- def newDaemonCachedThreadPool(): ThreadPoolExecutor =
- Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
+ def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+ val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
+ Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+ }
/**
* Return the string to tell how long has passed in seconds. The passing parameter should be in
@@ -465,10 +468,13 @@ private[spark] object Utils extends Logging {
}
/**
- * Wrapper over newFixedThreadPool.
+ * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+ * unique, sequentially assigned integer.
*/
- def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
- Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
+ def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
+ val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
+ Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
+ }
private def listFilesSafely(file: File): Seq[File] = {
val files = file.listFiles()