aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-09 16:44:42 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-09 16:44:42 -0800
commit50612ab3a9cf1e1633981454da321a4360388f68 (patch)
treebee55f0046ae06ac5e3983fcb6906ade0c4c8e27
parent6cb76d970826e0a102fcd0c0ea80599520034bfe (diff)
downloadspark-50612ab3a9cf1e1633981454da321a4360388f68.tar.gz
spark-50612ab3a9cf1e1633981454da321a4360388f68.tar.bz2
spark-50612ab3a9cf1e1633981454da321a4360388f68.zip
Every thread is a daemon thread => Program shuts down ASAP once its done.
Some log messages added to blank exception handlers.
-rw-r--r--src/scala/spark/Broadcast.scala70
1 files changed, 57 insertions, 13 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 5b1ea156db..476426b674 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -6,7 +6,7 @@ import java.util.{BitSet, Comparator, Random, Timer, TimerTask, UUID}
import com.google.common.collect.MapMaker
-import java.util.concurrent.{Executors, ExecutorService, ThreadPoolExecutor}
+import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import scala.collection.mutable.{ListBuffer, Map, Set}
@@ -374,7 +374,9 @@ extends BroadcastRecipe with Logging {
oosTracker.flush
gInfo = oisTracker.readObject.asInstanceOf[SourceInfo]
} catch {
- case e: Exception => { }
+ case e: Exception => {
+ logInfo ("getGuideInfo had a " + e)
+ }
} finally {
if (oisTracker != null) {
oisTracker.close
@@ -454,8 +456,7 @@ extends BroadcastRecipe with Logging {
override def run = {
var threadPool =
- Executors.newFixedThreadPool(
- BroadcastBT.MaxTxPeers).asInstanceOf[ThreadPoolExecutor]
+ BroadcastBT.newDaemonFixedThreadPool (BroadcastBT.MaxTxPeers)
while (hasBlocks < totalBlocks) {
var numThreadsToCreate =
@@ -636,8 +637,7 @@ extends BroadcastRecipe with Logging {
private var setOfCompletedSources = Set[SourceInfo] ()
override def run = {
- // TODO: Cached threadpool has 60s keep alive timer
- var threadPool = Executors.newCachedThreadPool
+ var threadPool = BroadcastBT.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0)
@@ -722,7 +722,9 @@ extends BroadcastRecipe with Logging {
SourceInfo.UnusedParam, SourceInfo.UnusedParam))
gosSource.flush
} catch {
- case e: Exception => { }
+ case e: Exception => {
+ logInfo ("sendStopBroadcastNotifications had a " + e)
+ }
} finally {
if (gisSource != null) {
gisSource.close
@@ -816,7 +818,7 @@ extends BroadcastRecipe with Logging {
// TODO: Not sure if this will be able to fix the number of outgoing links
// We should have a timeout mechanism on the receiver side
var threadPool =
- Executors.newFixedThreadPool(BroadcastBT.MaxRxPeers)
+ BroadcastBT.newDaemonFixedThreadPool(BroadcastBT.MaxRxPeers)
var serverSocket = new ServerSocket (0)
listenPort = serverSocket.getLocalPort
@@ -959,7 +961,9 @@ extends BroadcastRecipe with Logging {
oos.writeObject (arrayOfBlocks(blockIndex))
oos.flush
} catch {
- case e: Exception => { }
+ case e: Exception => {
+ logInfo ("pickAndSendBlock had a " + e)
+ }
}
logInfo ("Sent block: " + blockIndex + " to " + clientSocket)
}
@@ -1225,12 +1229,51 @@ extends Logging {
}
}
+ // Returns a standard ThreadFactory except all threads are daemons
+ private def newDaemonThreadFactory: ThreadFactory = {
+ new ThreadFactory {
+ def newThread(r: Runnable): Thread = {
+ var t = Executors.defaultThreadFactory.newThread(r)
+ t.setDaemon(true)
+ return t
+ }
+ }
+ }
+
+ // Wrapper over newCachedThreadPool
+ def newDaemonCachedThreadPool: ThreadPoolExecutor = {
+ var threadPool =
+ Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
+
+ threadPool.setThreadFactory (newDaemonThreadFactory)
+
+ return threadPool
+ }
+
+ // Wrapper over newFixedThreadPool
+ def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
+ var threadPool =
+ Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
+
+ threadPool.setThreadFactory (newDaemonThreadFactory)
+
+ return threadPool
+ }
+
class TrackMultipleValues
extends Thread with Logging {
var stopTracker = false
override def run = {
- var threadPool = Executors.newCachedThreadPool
+ var myThreadFactory = new ThreadFactory {
+ def newThread(r: Runnable): Thread = {
+ var t = Executors.defaultThreadFactory.newThread(r)
+ t.setDaemon(true)
+ return t
+ }
+ }
+
+ var threadPool = BroadcastBT.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (BroadcastBT.MasterTrackerPort)
@@ -1246,7 +1289,6 @@ extends Logging {
} catch {
case e: Exception => {
logInfo ("TrackMultipleValues Timeout. Stopping listening...")
- // TODO: Tracking should be explicitly stopped by the SparkContext
stopTracker = true
}
}
@@ -1265,10 +1307,12 @@ extends Logging {
valueToGuideMap (uuid)
} else SourceInfo ("", SourceInfo.TxNotStartedRetry,
SourceInfo.UnusedParam, SourceInfo.UnusedParam)
- logInfo ("TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort)
+ logInfo ("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort)
oos.writeObject (gInfo)
} catch {
- case e: Exception => { }
+ case e: Exception => {
+ logInfo ("TrackMultipleValues had a " + e)
+ }
} finally {
ois.close
oos.close