aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-21 23:31:00 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-21 23:31:00 -0800
commit551a47a620c7dc207e3530e54d794a3c3aa8e45e (patch)
tree6348d4c24cce6da6f698c665a90784a8f3247fea
parent7d3e359f2c463681cf0128da2c6692beb13dade9 (diff)
downloadspark-551a47a620c7dc207e3530e54d794a3c3aa8e45e.tar.gz
spark-551a47a620c7dc207e3530e54d794a3c3aa8e45e.tar.bz2
spark-551a47a620c7dc207e3530e54d794a3c3aa8e45e.zip
Refactor daemon thread pool creation.
-rw-r--r--core/src/main/scala/spark/DaemonThreadFactory.scala18
-rw-r--r--core/src/main/scala/spark/Utils.scala33
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala5
5 files changed, 13 insertions, 50 deletions
diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala
deleted file mode 100644
index 56e59adeb7..0000000000
--- a/core/src/main/scala/spark/DaemonThreadFactory.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package spark
-
-import java.util.concurrent.ThreadFactory
-
-/**
- * A ThreadFactory that creates daemon threads
- */
-private object DaemonThreadFactory extends ThreadFactory {
- override def newThread(r: Runnable): Thread = new DaemonThread(r)
-}
-
-private class DaemonThread(r: Runnable = null) extends Thread {
- override def run() {
- if (r != null) {
- r.run()
- }
- }
-} \ No newline at end of file
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 692a3f4050..9b8636f6c8 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -10,6 +10,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.io.Source
import com.google.common.io.Files
+import com.google.common.util.concurrent.ThreadFactoryBuilder
/**
* Various utility methods used by Spark.
@@ -287,29 +288,14 @@ private object Utils extends Logging {
customHostname.getOrElse(InetAddress.getLocalHost.getHostName)
}
- /**
- * Returns a standard ThreadFactory except all threads are daemons.
- */
- private def newDaemonThreadFactory: ThreadFactory = {
- new ThreadFactory {
- def newThread(r: Runnable): Thread = {
- var t = Executors.defaultThreadFactory.newThread (r)
- t.setDaemon (true)
- return t
- }
- }
- }
+ private[spark] val daemonThreadFactory: ThreadFactory =
+ new ThreadFactoryBuilder().setDaemon(true).build()
/**
* Wrapper over newCachedThreadPool.
*/
- def newDaemonCachedThreadPool(): ThreadPoolExecutor = {
- var threadPool = Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
-
- threadPool.setThreadFactory (newDaemonThreadFactory)
-
- return threadPool
- }
+ def newDaemonCachedThreadPool(): ThreadPoolExecutor =
+ Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
/**
* Return the string to tell how long has passed in seconds. The passing parameter should be in
@@ -322,13 +308,8 @@ private object Utils extends Logging {
/**
* Wrapper over newFixedThreadPool.
*/
- def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = {
- var threadPool = Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor]
-
- threadPool.setThreadFactory(newDaemonThreadFactory)
-
- return threadPool
- }
+ def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
+ Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
/**
* Delete a file or directory and its contents recursively.
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 36c01ad629..2ecd14f536 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -52,9 +52,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
val sendMessageRequests = new Queue[(Message, SendingConnection)]
- implicit val futureExecContext = ExecutionContext.fromExecutor(
- Executors.newCachedThreadPool(DaemonThreadFactory))
-
+ implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool())
+
var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
serverChannel.configureBlocking(false)
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index dff550036d..87f8474ea0 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -20,7 +20,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
with Logging {
var attemptId = new AtomicInteger(0)
- var threadPool = Executors.newFixedThreadPool(threads, DaemonThreadFactory)
+ var threadPool = Utils.newDaemonFixedThreadPool(threads)
val env = SparkEnv.get
var listener: TaskSchedulerListener = null
diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 290fab1ce0..04e6b69b7b 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.{DaemonThread, Logging}
+import spark.Logging
import spark.storage.StorageLevel
import spark.streaming.StreamingContext
@@ -48,7 +48,8 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
val queue = new ArrayBlockingQueue[ByteBuffer](2)
- blockPushingThread = new DaemonThread {
+ blockPushingThread = new Thread {
+ setDaemon(true)
override def run() {
var nextBlockNumber = 0
while (true) {