diff options
author | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-10 01:03:47 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-10 01:03:47 -0700 |
commit | 34999d97f5634d1ae978517da4949ae437c1e8b4 (patch) | |
tree | a3f2cd2b97a581294750d06785b9920425fdbce5 | |
parent | d6a9680604525b3291ae4918f234729a1b1ea14d (diff) | |
download | spark-34999d97f5634d1ae978517da4949ae437c1e8b4.tar.gz spark-34999d97f5634d1ae978517da4949ae437c1e8b4.tar.bz2 spark-34999d97f5634d1ae978517da4949ae437c1e8b4.zip |
Added stop() to the Broadcast subsystem
6 files changed, 27 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 2932434504..c00b89a975 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -61,7 +61,6 @@ extends Broadcast[T] with Logging with Serializable { var variableInfo = MultiTracker.blockifyObject(value_) // Prepare the value being broadcasted - // TODO: Refactoring and clean-up required here arrayOfBlocks = variableInfo.arrayOfBlocks totalBytes = variableInfo.totalBytes totalBlocks = variableInfo.totalBlocks @@ -1061,11 +1060,7 @@ extends Broadcast[T] with Logging with Serializable { class BitTorrentBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean) { - MultiTracker.initialize(isMaster) - } - - def newBroadcast[T](value_ : T, isLocal: Boolean) = { - new BitTorrentBroadcast[T](value_, isLocal) - } + def initialize(isMaster: Boolean) = MultiTracker.initialize(isMaster) + def newBroadcast[T](value_ : T, isLocal: Boolean) = new BitTorrentBroadcast[T](value_, isLocal) + def stop() = MultiTracker.stop } diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index 3126715d6b..135bc31b72 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -56,7 +56,7 @@ object Broadcast extends Logging with Serializable { } def stop() { - // TODO: + broadcastFactory.stop() } def getBroadcastFactory: BroadcastFactory = { diff --git a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala index b18908f789..e341d556bf 100644 --- a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala @@ -9,4 +9,5 @@ package spark.broadcast trait BroadcastFactory { def initialize(isMaster: Boolean): Unit def newBroadcast[T](value_ : T, isLocal: Boolean): Broadcast[T] + def stop(): Unit } diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index 81ea6d7451..ec8749c4a5 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -44,10 +44,9 @@ extends Broadcast[T] with Logging with Serializable { } class HttpBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean) { - HttpBroadcast.initialize(isMaster) - } + def initialize(isMaster: Boolean) = HttpBroadcast.initialize(isMaster) def newBroadcast[T](value_ : T, isLocal: Boolean) = new HttpBroadcast[T](value_, isLocal) + def stop() = HttpBroadcast.stop() } private object HttpBroadcast extends Logging { @@ -72,6 +71,12 @@ private object HttpBroadcast extends Logging { } } } + + def stop() { + if (server != null) { + server.stop() + } + } private def createServer() { broadcastDir = Utils.createTempDir() diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala index 7cd0cb6cf1..d5daea9546 100644 --- a/core/src/main/scala/spark/broadcast/MultiTracker.scala +++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala @@ -27,6 +27,8 @@ extends Logging { private var initialized = false private var isMaster_ = false + private var stopBroadcast = false + private var trackMV: TrackMultipleValues = null def initialize(isMaster__ : Boolean) { @@ -45,6 +47,10 @@ extends Logging { } } } + + def stop() { + stopBroadcast = true + } // Load common parameters private var MasterTrackerPort_ = System.getProperty( @@ -122,14 +128,16 @@ extends Logging { logInfo("TrackMultipleValues" + serverSocket) try { - while (true) { + while (!stopBroadcast) { var clientSocket: Socket = null try { serverSocket.setSoTimeout(TrackerSocketTimeout) clientSocket = serverSocket.accept() } catch { case e: Exception => { - logInfo("TrackMultipleValues Timeout. Stopping listening...") + if (stopBroadcast) { + logInfo("Stopping TrackMultipleValues...") + } } } @@ -207,9 +215,7 @@ extends Logging { }) } catch { // In failure, close socket here; else, client thread will close - case ioe: IOException => { - clientSocket.close() - } + case ioe: IOException => clientSocket.close() } } } diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index d5c7752c0e..5c03f5abf4 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -51,7 +51,6 @@ extends Broadcast[T] with Logging with Serializable { var variableInfo = MultiTracker.blockifyObject(value_) // Prepare the value being broadcasted - // TODO: Refactoring and clean-up required here arrayOfBlocks = variableInfo.arrayOfBlocks totalBytes = variableInfo.totalBytes totalBlocks = variableInfo.totalBlocks @@ -618,9 +617,7 @@ extends Broadcast[T] with Logging with Serializable { class TreeBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean) { - MultiTracker.initialize(isMaster) - } - def newBroadcast[T](value_ : T, isLocal: Boolean) = - new TreeBroadcast[T](value_, isLocal) + def initialize(isMaster: Boolean) = MultiTracker.initialize(isMaster) + def newBroadcast[T](value_ : T, isLocal: Boolean) = new TreeBroadcast[T](value_, isLocal) + def stop() = MultiTracker.stop } |