aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-09 17:05:19 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-09 17:05:19 -0800
commitb58bbadbfbaaaa5f3de79e65e5a5e1928ae39d68 (patch)
tree431924d6ca7278dbf1d7de57eabc9cb6d5d67565
parent1820634dbffab5bf2c9af43d4eba6db8fcb0ba06 (diff)
downloadspark-b58bbadbfbaaaa5f3de79e65e5a5e1928ae39d68.tar.gz
spark-b58bbadbfbaaaa5f3de79e65e5a5e1928ae39d68.tar.bz2
spark-b58bbadbfbaaaa5f3de79e65e5a5e1928ae39d68.zip
All daemon => Fast shutdown :)
-rw-r--r--src/scala/spark/Broadcast.scala64
1 files changed, 49 insertions, 15 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index a9a1e98e2b..2750aef289 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -6,7 +6,7 @@ import java.util.{Comparator, PriorityQueue, Random, UUID}
import com.google.common.collect.MapMaker
-import java.util.concurrent.{Executors, ExecutorService, ThreadPoolExecutor}
+import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{Map, Set}
@@ -257,7 +257,9 @@ extends BroadcastRecipe with Logging {
oosTracker.flush
masterListenPort = oisTracker.readObject.asInstanceOf[Int]
} catch {
- case e: Exception => { }
+ case e: Exception => {
+ logInfo ("getMasterListenPort had a " + e)
+ }
} finally {
if (oisTracker != null) {
oisTracker.close
@@ -428,7 +430,7 @@ extends BroadcastRecipe with Logging {
private var setOfCompletedSources = Set[SourceInfo] ()
override def run = {
- var threadPool = Executors.newCachedThreadPool
+ var threadPool = BroadcastCS.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0)
@@ -510,7 +512,9 @@ extends BroadcastRecipe with Logging {
SourceInfo.StopBroadcast))
gosSource.flush
} catch {
- case e: Exception => { }
+ case e: Exception => {
+ logInfo ("sendStopBroadcastNotifications had a " + e)
+ }
} finally {
if (gisSource != null) {
gisSource.close
@@ -642,7 +646,7 @@ extends BroadcastRecipe with Logging {
class ServeMultipleRequests
extends Thread with Logging {
override def run = {
- var threadPool = Executors.newCachedThreadPool
+ var threadPool = BroadcastCS.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0)
@@ -743,7 +747,9 @@ extends BroadcastRecipe with Logging {
oos.writeObject (arrayOfBlocks(i))
oos.flush
} catch {
- case e: Exception => { }
+ case e: Exception => {
+ logInfo ("sendObject had a " + e)
+ }
}
logInfo ("Sent block: " + i + " to " + clientSocket)
}
@@ -1040,6 +1046,37 @@ extends Logging {
}
}
+ // Returns a standard ThreadFactory except all threads are daemons
+ private def newDaemonThreadFactory: ThreadFactory = {
+ new ThreadFactory {
+ def newThread(r: Runnable): Thread = {
+ var t = Executors.defaultThreadFactory.newThread (r)
+ t.setDaemon (true)
+ return t
+ }
+ }
+ }
+
+ // Wrapper over newCachedThreadPool
+ def newDaemonCachedThreadPool: ThreadPoolExecutor = {
+ var threadPool =
+ Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
+
+ threadPool.setThreadFactory (newDaemonThreadFactory)
+
+ return threadPool
+ }
+
+ // Wrapper over newFixedThreadPool
+ def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
+ var threadPool =
+ Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
+
+ threadPool.setThreadFactory (newDaemonThreadFactory)
+
+ return threadPool
+ }
+
def getSourceSpeed (hostAddress: String): Double = {
sourceToSpeedMap.synchronized {
sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
@@ -1056,27 +1093,22 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
- var stopTracker = false
-
override def run = {
- var threadPool = Executors.newCachedThreadPool
+ var threadPool = BroadcastCS.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (BroadcastCS.MasterTrackerPort)
logInfo ("TrackMultipleValues" + serverSocket)
try {
- while (!stopTracker) {
+ while (true) {
var clientSocket: Socket = null
try {
- // TODO:
serverSocket.setSoTimeout (TrackerSocketTimeout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
logInfo ("TrackMultipleValues Timeout. Stopping listening...")
- // TODO: Tracking should be explicitly stopped by the SparkContext
- stopTracker = true
}
}
@@ -1093,10 +1125,12 @@ extends Logging {
if (valueToGuidePortMap.contains (uuid)) {
valueToGuidePortMap (uuid)
} else SourceInfo.TxNotStartedRetry
- logInfo ("TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + guidePort)
+ logInfo ("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + guidePort)
oos.writeObject (guidePort)
} catch {
- case e: Exception => { }
+ case e: Exception => {
+ logInfo ("TrackMultipleValues had a " + e)
+ }
} finally {
ois.close
oos.close