aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-22 11:08:59 -0700
committerReynold Xin <rxin@databricks.com>2015-04-22 11:08:59 -0700
commit33b85620f910c404873d362d27cca1223084913a (patch)
treeda1003de30e34743e99fe0ffc38a8d7d8ba009d6 /core
parentbdc5c16e76c5d0bc147408353b2ba4faa8e914fc (diff)
downloadspark-33b85620f910c404873d362d27cca1223084913a.tar.gz
spark-33b85620f910c404873d362d27cca1223084913a.tar.bz2
spark-33b85620f910c404873d362d27cca1223084913a.zip
[SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils
As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176 What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time. Author: zsxwing <zsxwing@gmail.com> Closes #5631 from zsxwing/thread-utils and squashes the following commits: 9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/ThreadUtils.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala29
-rw-r--r--core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala57
15 files changed, 161 insertions, 68 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 4e7bf51fc0..b986fa87dc 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -17,12 +17,12 @@
package org.apache.spark
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.TimeUnit
import scala.collection.mutable
import org.apache.spark.scheduler._
-import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
@@ -132,8 +132,8 @@ private[spark] class ExecutorAllocationManager(
private val listener = new ExecutorAllocationListener
// Executor that handles the scheduling task.
- private val executor = Executors.newSingleThreadScheduledExecutor(
- Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
+ private val executor =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
/**
* Verify that the settings specified through the config are valid.
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index e3bd16f1cb..68d05d5b02 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import java.util.concurrent.{ScheduledFuture, TimeUnit, Executors}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
@@ -25,7 +25,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
@@ -76,11 +76,10 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
private var timeoutCheckingTask: ScheduledFuture[_] = null
- private val timeoutCheckingThread = Executors.newSingleThreadScheduledExecutor(
- Utils.namedThreadFactory("heartbeat-timeout-checking-thread"))
+ private val timeoutCheckingThread =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
- private val killExecutorThread = Executors.newSingleThreadExecutor(
- Utils.namedThreadFactory("kill-executor-thread"))
+ private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
override def onStart(): Unit = {
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 47bdd7749e..9847d5944a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -99,7 +99,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private val replayExecutor: ExecutorService = {
if (!conf.contains("spark.testing")) {
- Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor"))
+ ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
} else {
MoreExecutors.sameThreadExecutor()
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 327d155b38..5fc04df5d6 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.lang.management.ManagementFactory
import java.net.URL
import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -76,7 +76,7 @@ private[spark] class Executor(
}
// Start worker thread pool
- private val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
+ private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val executorSource = new ExecutorSource(threadPool, executorId)
if (!isLocal) {
@@ -110,8 +110,7 @@ private[spark] class Executor(
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
// Executor for the heartbeat task.
- private val heartbeater = Executors.newSingleThreadScheduledExecutor(
- Utils.namedThreadFactory("driver-heartbeater"))
+ private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
startDriverHeartbeater()
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 1a68e621ea..16e905982c 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -36,7 +36,7 @@ import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}
import org.apache.spark._
import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
import scala.util.Try
import scala.util.control.NonFatal
@@ -79,7 +79,7 @@ private[nio] class ConnectionManager(
private val selector = SelectorProvider.provider.openSelector()
private val ackTimeoutMonitor =
- new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
+ new HashedWheelTimer(ThreadUtils.namedThreadFactory("AckTimeoutMonitor"))
private val ackTimeout =
conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
@@ -102,7 +102,7 @@ private[nio] class ConnectionManager(
handlerThreadCount,
conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
- Utils.namedThreadFactory("handle-message-executor")) {
+ ThreadUtils.namedThreadFactory("handle-message-executor")) {
override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
@@ -117,7 +117,7 @@ private[nio] class ConnectionManager(
ioThreadCount,
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
- Utils.namedThreadFactory("handle-read-write-executor")) {
+ ThreadUtils.namedThreadFactory("handle-read-write-executor")) {
override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
@@ -134,7 +134,7 @@ private[nio] class ConnectionManager(
connectThreadCount,
conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
- Utils.namedThreadFactory("handle-connect-executor")) {
+ ThreadUtils.namedThreadFactory("handle-connect-executor")) {
override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
@@ -160,7 +160,7 @@ private[nio] class ConnectionManager(
private val registerRequests = new SynchronizedQueue[SendingConnection]
implicit val futureExecContext = ExecutionContext.fromExecutor(
- Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
+ ThreadUtils.newDaemonCachedThreadPool("Connection manager future execution context"))
@volatile
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4a32f8936f..8c4bff4e83 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
import java.io.NotSerializableException
import java.util.Properties
-import java.util.concurrent.{TimeUnit, Executors}
+import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack}
@@ -129,7 +129,7 @@ class DAGScheduler(
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
private val messageScheduler =
- Executors.newScheduledThreadPool(1, Utils.namedThreadFactory("dag-scheduler-message"))
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 3938580aee..391827c1d2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
@@ -35,7 +35,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
extends Logging {
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
- private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
+ private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool(
THREADS, "task-result-getter")
protected val serializer = new ThreadLocal[SerializerInstance] {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 63987dfb32..9656fb7685 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster
-import java.util.concurrent.{TimeUnit, Executors}
+import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -26,7 +26,7 @@ import org.apache.spark.rpc._
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
+import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils}
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -73,7 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private val addressToExecutorId = new HashMap[RpcAddress, String]
private val reviveThread =
- Executors.newSingleThreadScheduledExecutor(Utils.namedThreadFactory("driver-revive-thread"))
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 1406a36a66..d987c7d563 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -24,7 +24,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.{RpcUtils, Utils}
+import org.apache.spark.util.{ThreadUtils, RpcUtils}
import scala.util.control.NonFatal
@@ -97,7 +97,7 @@ private[spark] abstract class YarnSchedulerBackend(
private var amEndpoint: Option[RpcEndpointRef] = None
private val askAmThreadPool =
- Utils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
+ ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
override def receive: PartialFunction[Any, Unit] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 50ba0b9d5a..ac5b524517 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -18,14 +18,14 @@
package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.TimeUnit
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
private case class ReviveOffers()
@@ -47,8 +47,8 @@ private[spark] class LocalEndpoint(
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging {
- private val reviveThread = Executors.newSingleThreadScheduledExecutor(
- Utils.namedThreadFactory("local-revive-thread"))
+ private val reviveThread =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
private var freeCores = totalCores
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 28c73a7d54..4682167912 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses
@@ -51,7 +51,7 @@ class BlockManagerMasterEndpoint(
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- private val askThreadPool = Utils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
+ private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index 8980fa8eb7..543df4e135 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.rpc.{RpcEnv, RpcCallContext, RpcEndpoint}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
import org.apache.spark.{Logging, MapOutputTracker, SparkEnv}
import org.apache.spark.storage.BlockManagerMessages._
@@ -36,7 +36,7 @@ class BlockManagerSlaveEndpoint(
extends RpcEndpoint with Logging {
private val asyncThreadPool =
- Utils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
+ ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
// Operations that involve removing blocks may be slow and should be done asynchronously
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
new file mode 100644
index 0000000000..098a4b7949
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.util
+
+import java.util.concurrent._
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
+private[spark] object ThreadUtils {
+
+ /**
+ * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
+ */
+ def namedThreadFactory(prefix: String): ThreadFactory = {
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build()
+ }
+
+ /**
+ * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+ * unique, sequentially assigned integer.
+ */
+ def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+ val threadFactory = namedThreadFactory(prefix)
+ Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+ }
+
+ /**
+ * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+ * unique, sequentially assigned integer.
+ */
+ def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
+ val threadFactory = namedThreadFactory(prefix)
+ Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
+ }
+
+ /**
+ * Wrapper over newSingleThreadExecutor.
+ */
+ def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
+ val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+ Executors.newSingleThreadExecutor(threadFactory)
+ }
+
+ /**
+ * Wrapper over newSingleThreadScheduledExecutor.
+ */
+ def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
+ val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+ Executors.newSingleThreadScheduledExecutor(threadFactory)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7b0de1ae55..2feb7341b1 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -35,7 +35,6 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.{ByteStreams, Files}
import com.google.common.net.InetAddresses
-import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
@@ -897,34 +896,6 @@ private[spark] object Utils extends Logging {
hostPortParseResults.get(hostPort)
}
- private val daemonThreadFactoryBuilder: ThreadFactoryBuilder =
- new ThreadFactoryBuilder().setDaemon(true)
-
- /**
- * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
- */
- def namedThreadFactory(prefix: String): ThreadFactory = {
- daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
- }
-
- /**
- * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
- * unique, sequentially assigned integer.
- */
- def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
- val threadFactory = namedThreadFactory(prefix)
- Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
- }
-
- /**
- * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
- * unique, sequentially assigned integer.
- */
- def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
- val threadFactory = namedThreadFactory(prefix)
- Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
- }
-
/**
* Return the string to tell how long has passed in milliseconds.
*/
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
new file mode 100644
index 0000000000..a3aa3e953f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.util
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.scalatest.FunSuite
+
+class ThreadUtilsSuite extends FunSuite {
+
+ test("newDaemonSingleThreadExecutor") {
+ val executor = ThreadUtils.newDaemonSingleThreadExecutor("this-is-a-thread-name")
+ @volatile var threadName = ""
+ executor.submit(new Runnable {
+ override def run(): Unit = {
+ threadName = Thread.currentThread().getName()
+ }
+ })
+ executor.shutdown()
+ executor.awaitTermination(10, TimeUnit.SECONDS)
+ assert(threadName === "this-is-a-thread-name")
+ }
+
+ test("newDaemonSingleThreadScheduledExecutor") {
+ val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("this-is-a-thread-name")
+ try {
+ val latch = new CountDownLatch(1)
+ @volatile var threadName = ""
+ executor.schedule(new Runnable {
+ override def run(): Unit = {
+ threadName = Thread.currentThread().getName()
+ latch.countDown()
+ }
+ }, 1, TimeUnit.MILLISECONDS)
+ latch.await(10, TimeUnit.SECONDS)
+ assert(threadName === "this-is-a-thread-name")
+ } finally {
+ executor.shutdownNow()
+ }
+ }
+}