aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-27 22:13:01 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-27 22:13:01 -0700
commit60d1121343bad0c7d5e39a53c86f5822334583a4 (patch)
treecd2b4a97a1590ddd48763db5f9b6733a767f9277 /core
parente898e108a3b505cf56617885f63b89732ff85abc (diff)
downloadspark-60d1121343bad0c7d5e39a53c86f5822334583a4.tar.gz
spark-60d1121343bad0c7d5e39a53c86f5822334583a4.tar.bz2
spark-60d1121343bad0c7d5e39a53c86f5822334583a4.zip
Refactoring: daemonThreadFactories have all been moved to the Utils
object instead of having multiple copies in Broadcast and Shuffle objects.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Utils.scala45
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala8
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala31
-rw-r--r--core/src/main/scala/spark/broadcast/ChainedBroadcast.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala6
-rw-r--r--core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala5
-rw-r--r--core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala5
-rw-r--r--core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala3
-rw-r--r--core/src/main/scala/spark/shuffle/Shuffle.scala31
9 files changed, 54 insertions, 86 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index e333dd9c91..51312d7f67 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -3,6 +3,7 @@ package spark
import java.io._
import java.net.InetAddress
import java.util.UUID
+import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
@@ -117,11 +118,43 @@ object Utils {
/**
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4)
*/
- def localIpAddress(): String = {
- // Get local IP as an array of four bytes
- val bytes = InetAddress.getLocalHost().getAddress()
- // Convert the bytes to ints (keeping in mind that they may be negative)
- // and join them into a string
- return bytes.map(b => (b.toInt + 256) % 256).mkString(".")
+ def localIpAddress(): String = InetAddress.getLocalHost.getHostAddress
+
+ /**
+ * Returns a standard ThreadFactory except all threads are daemons
+ */
+ private def newDaemonThreadFactory: ThreadFactory = {
+ new ThreadFactory {
+ def newThread(r: Runnable): Thread = {
+ var t = Executors.defaultThreadFactory.newThread (r)
+ t.setDaemon (true)
+ return t
+ }
+ }
+ }
+
+ /**
+ * Wrapper over newCachedThreadPool
+ */
+ def newDaemonCachedThreadPool(): ThreadPoolExecutor = {
+ var threadPool =
+ Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
+
+ threadPool.setThreadFactory (newDaemonThreadFactory)
+
+ return threadPool
+ }
+
+ /**
+ * Wrapper over newFixedThreadPool
+ */
+ def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = {
+ var threadPool =
+ Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor]
+
+ threadPool.setThreadFactory(newDaemonThreadFactory)
+
+ return threadPool
}
+
}
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index cc5434789b..3cf9461531 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -447,7 +447,7 @@ extends Broadcast[T] with Logging {
private var blocksInRequestBitVector = new BitSet(totalBlocks)
override def run: Unit = {
- var threadPool = Broadcast.newDaemonFixedThreadPool(Broadcast.MaxRxSlots)
+ var threadPool = Utils.newDaemonFixedThreadPool(Broadcast.MaxRxSlots)
while (hasBlocks.get < totalBlocks) {
var numThreadsToCreate =
@@ -862,7 +862,7 @@ extends Broadcast[T] with Logging {
private var setOfCompletedSources = Set[SourceInfo]()
override def run: Unit = {
- var threadPool = Broadcast.newDaemonCachedThreadPool
+ var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(0)
@@ -1059,7 +1059,7 @@ extends Broadcast[T] with Logging {
class ServeMultipleRequests
extends Thread with Logging {
// Server at most Broadcast.MaxTxSlots peers
- var threadPool = Broadcast.newDaemonFixedThreadPool(Broadcast.MaxTxSlots)
+ var threadPool = Utils.newDaemonFixedThreadPool(Broadcast.MaxTxSlots)
override def run: Unit = {
var serverSocket = new ServerSocket(0)
@@ -1247,7 +1247,7 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
override def run: Unit = {
- var threadPool = Broadcast.newDaemonCachedThreadPool
+ var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(Broadcast.MasterTrackerPort)
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index d2d0c85621..1c4813481e 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -129,37 +129,6 @@ extends Logging {
def EndGameFraction = EndGameFraction_
- // Returns a standard ThreadFactory except all threads are daemons
- private def newDaemonThreadFactory: ThreadFactory = {
- new ThreadFactory {
- def newThread(r: Runnable): Thread = {
- var t = Executors.defaultThreadFactory.newThread (r)
- t.setDaemon (true)
- return t
- }
- }
- }
-
- // Wrapper over newCachedThreadPool
- def newDaemonCachedThreadPool: ThreadPoolExecutor = {
- var threadPool =
- Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
-
- threadPool.setThreadFactory (newDaemonThreadFactory)
-
- return threadPool
- }
-
- // Wrapper over newFixedThreadPool
- def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
- var threadPool =
- Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
-
- threadPool.setThreadFactory (newDaemonThreadFactory)
-
- return threadPool
- }
-
// Helper functions to convert an object to Array[BroadcastBlock]
def blockifyObject[IN](obj: IN): VariableInfo = {
val baos = new ByteArrayOutputStream
diff --git a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
index f43b50bdb9..062f048279 100644
--- a/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/ChainedBroadcast.scala
@@ -351,7 +351,7 @@ extends Broadcast[T] with Logging {
private var setOfCompletedSources = Set[SourceInfo]()
override def run: Unit = {
- var threadPool = Broadcast.newDaemonCachedThreadPool
+ var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(0)
@@ -558,7 +558,7 @@ extends Broadcast[T] with Logging {
class ServeMultipleRequests
extends Thread with Logging {
override def run: Unit = {
- var threadPool = Broadcast.newDaemonCachedThreadPool
+ var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(0)
@@ -731,7 +731,7 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
override def run: Unit = {
- var threadPool = Broadcast.newDaemonCachedThreadPool
+ var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(Broadcast.MasterTrackerPort)
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index 867795dbf2..7540e7f47c 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -355,7 +355,7 @@ extends Broadcast[T] with Logging {
private var setOfCompletedSources = Set[SourceInfo]()
override def run: Unit = {
- var threadPool = Broadcast.newDaemonCachedThreadPool
+ var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(0)
@@ -571,7 +571,7 @@ extends Broadcast[T] with Logging {
class ServeMultipleRequests
extends Thread with Logging {
override def run: Unit = {
- var threadPool = Broadcast.newDaemonCachedThreadPool
+ var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(0)
@@ -746,7 +746,7 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
override def run: Unit = {
- var threadPool = Broadcast.newDaemonCachedThreadPool
+ var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(Broadcast.MasterTrackerPort)
diff --git a/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala
index 94c2848d35..e9af7cbbab 100644
--- a/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala
+++ b/core/src/main/scala/spark/shuffle/CustomBlockedLocalFileShuffle.scala
@@ -151,8 +151,7 @@ extends Shuffle[K, V, C] with Logging {
receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]
combiners = new HashMap[K, C]
- var threadPool = Shuffle.newDaemonFixedThreadPool(
- Shuffle.MaxRxConnections)
+ var threadPool = Utils.newDaemonFixedThreadPool(Shuffle.MaxRxConnections)
while (hasSplits < totalSplits) {
var numThreadsToCreate =
@@ -471,7 +470,7 @@ object CustomBlockedLocalFileShuffle extends Logging {
class ShuffleServer
extends Thread with Logging {
- var threadPool = Shuffle.newDaemonFixedThreadPool(Shuffle.MaxTxConnections)
+ var threadPool = Utils.newDaemonFixedThreadPool(Shuffle.MaxTxConnections)
var serverSocket: ServerSocket = null
diff --git a/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala
index a338b0259a..0ff4994962 100644
--- a/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala
+++ b/core/src/main/scala/spark/shuffle/CustomParallelLocalFileShuffle.scala
@@ -99,8 +99,7 @@ extends Shuffle[K, V, C] with Logging {
receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]
combiners = new HashMap[K, C]
- var threadPool = Shuffle.newDaemonFixedThreadPool(
- Shuffle.MaxRxConnections)
+ var threadPool = Utils.newDaemonFixedThreadPool(Shuffle.MaxRxConnections)
while (hasSplits < totalSplits) {
var numThreadsToCreate = math.min(totalSplits,
@@ -398,7 +397,7 @@ object CustomParallelLocalFileShuffle extends Logging {
class ShuffleServer
extends Thread with Logging {
- var threadPool = Shuffle.newDaemonFixedThreadPool(Shuffle.MaxTxConnections)
+ var threadPool = Utils.newDaemonFixedThreadPool(Shuffle.MaxTxConnections)
var serverSocket: ServerSocket = null
diff --git a/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala b/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala
index e11ba5fed4..5159d312ca 100644
--- a/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala
+++ b/core/src/main/scala/spark/shuffle/HttpParallelLocalFileShuffle.scala
@@ -96,8 +96,7 @@ extends Shuffle[K, V, C] with Logging {
receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]
combiners = new HashMap[K, C]
- var threadPool = Shuffle.newDaemonFixedThreadPool(
- Shuffle.MaxRxConnections)
+ var threadPool = Utils.newDaemonFixedThreadPool(Shuffle.MaxRxConnections)
while (hasSplits < totalSplits) {
var numThreadsToCreate =
diff --git a/core/src/main/scala/spark/shuffle/Shuffle.scala b/core/src/main/scala/spark/shuffle/Shuffle.scala
index 96a23beac6..95c8cd8a4b 100644
--- a/core/src/main/scala/spark/shuffle/Shuffle.scala
+++ b/core/src/main/scala/spark/shuffle/Shuffle.scala
@@ -57,35 +57,4 @@ extends Logging {
def MaxChatTime = MaxChatTime_
def MaxChatBlocks = MaxChatBlocks_
-
- // Returns a standard ThreadFactory except all threads are daemons
- private def newDaemonThreadFactory: ThreadFactory = {
- new ThreadFactory {
- def newThread(r: Runnable): Thread = {
- var t = Executors.defaultThreadFactory.newThread(r)
- t.setDaemon(true)
- return t
- }
- }
- }
-
- // Wrapper over newFixedThreadPool
- def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = {
- var threadPool =
- Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor]
-
- threadPool.setThreadFactory(newDaemonThreadFactory)
-
- return threadPool
- }
-
- // Wrapper over newCachedThreadPool
- def newDaemonCachedThreadPool: ThreadPoolExecutor = {
- var threadPool =
- Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
-
- threadPool.setThreadFactory(newDaemonThreadFactory)
-
- return threadPool
- }
} \ No newline at end of file