diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-09 17:05:19 -0800 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-09 17:05:19 -0800 |
commit | b58bbadbfbaaaa5f3de79e65e5a5e1928ae39d68 (patch) | |
tree | 431924d6ca7278dbf1d7de57eabc9cb6d5d67565 | |
parent | 1820634dbffab5bf2c9af43d4eba6db8fcb0ba06 (diff) | |
download | spark-b58bbadbfbaaaa5f3de79e65e5a5e1928ae39d68.tar.gz spark-b58bbadbfbaaaa5f3de79e65e5a5e1928ae39d68.tar.bz2 spark-b58bbadbfbaaaa5f3de79e65e5a5e1928ae39d68.zip |
All daemon => Fast shutdown :)
-rw-r--r-- | src/scala/spark/Broadcast.scala | 64 |
1 files changed, 49 insertions, 15 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index a9a1e98e2b..2750aef289 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -6,7 +6,7 @@ import java.util.{Comparator, PriorityQueue, Random, UUID} import com.google.common.collect.MapMaker -import java.util.concurrent.{Executors, ExecutorService, ThreadPoolExecutor} +import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{Map, Set} @@ -257,7 +257,9 @@ extends BroadcastRecipe with Logging { oosTracker.flush masterListenPort = oisTracker.readObject.asInstanceOf[Int] } catch { - case e: Exception => { } + case e: Exception => { + logInfo ("getMasterListenPort had a " + e) + } } finally { if (oisTracker != null) { oisTracker.close @@ -428,7 +430,7 @@ extends BroadcastRecipe with Logging { private var setOfCompletedSources = Set[SourceInfo] () override def run = { - var threadPool = Executors.newCachedThreadPool + var threadPool = BroadcastCS.newDaemonCachedThreadPool var serverSocket: ServerSocket = null serverSocket = new ServerSocket (0) @@ -510,7 +512,9 @@ extends BroadcastRecipe with Logging { SourceInfo.StopBroadcast)) gosSource.flush } catch { - case e: Exception => { } + case e: Exception => { + logInfo ("sendStopBroadcastNotifications had a " + e) + } } finally { if (gisSource != null) { gisSource.close @@ -642,7 +646,7 @@ extends BroadcastRecipe with Logging { class ServeMultipleRequests extends Thread with Logging { override def run = { - var threadPool = Executors.newCachedThreadPool + var threadPool = BroadcastCS.newDaemonCachedThreadPool var serverSocket: ServerSocket = null serverSocket = new ServerSocket (0) @@ -743,7 +747,9 @@ extends BroadcastRecipe with Logging { oos.writeObject (arrayOfBlocks(i)) oos.flush } catch { - case e: Exception => { } + case e: Exception => { + logInfo ("sendObject had a " + e) + } } logInfo ("Sent block: " + i + " to " + clientSocket) } @@ -1040,6 +1046,37 @@ extends Logging { } } + // Returns a standard ThreadFactory except all threads are daemons + private def newDaemonThreadFactory: ThreadFactory = { + new ThreadFactory { + def newThread(r: Runnable): Thread = { + var t = Executors.defaultThreadFactory.newThread (r) + t.setDaemon (true) + return t + } + } + } + + // Wrapper over newCachedThreadPool + def newDaemonCachedThreadPool: ThreadPoolExecutor = { + var threadPool = + Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor] + + threadPool.setThreadFactory (newDaemonThreadFactory) + + return threadPool + } + + // Wrapper over newFixedThreadPool + def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = { + var threadPool = + Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor] + + threadPool.setThreadFactory (newDaemonThreadFactory) + + return threadPool + } + def getSourceSpeed (hostAddress: String): Double = { sourceToSpeedMap.synchronized { sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps) @@ -1056,27 +1093,22 @@ extends Logging { class TrackMultipleValues extends Thread with Logging { - var stopTracker = false - override def run = { - var threadPool = Executors.newCachedThreadPool + var threadPool = BroadcastCS.newDaemonCachedThreadPool var serverSocket: ServerSocket = null serverSocket = new ServerSocket (BroadcastCS.MasterTrackerPort) logInfo ("TrackMultipleValues" + serverSocket) try { - while (!stopTracker) { + while (true) { var clientSocket: Socket = null try { - // TODO: serverSocket.setSoTimeout (TrackerSocketTimeout) clientSocket = serverSocket.accept } catch { case e: Exception => { logInfo ("TrackMultipleValues Timeout. Stopping listening...") - // TODO: Tracking should be explicitly stopped by the SparkContext - stopTracker = true } } @@ -1093,10 +1125,12 @@ extends Logging { if (valueToGuidePortMap.contains (uuid)) { valueToGuidePortMap (uuid) } else SourceInfo.TxNotStartedRetry - logInfo ("TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + guidePort) + logInfo ("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + guidePort) oos.writeObject (guidePort) } catch { - case e: Exception => { } + case e: Exception => { + logInfo ("TrackMultipleValues had a " + e) + } } finally { ois.close oos.close |