aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-10 01:03:47 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-10 01:03:47 -0700
commit34999d97f5634d1ae978517da4949ae437c1e8b4 (patch)
treea3f2cd2b97a581294750d06785b9920425fdbce5
parentd6a9680604525b3291ae4918f234729a1b1ea14d (diff)
downloadspark-34999d97f5634d1ae978517da4949ae437c1e8b4.tar.gz
spark-34999d97f5634d1ae978517da4949ae437c1e8b4.tar.bz2
spark-34999d97f5634d1ae978517da4949ae437c1e8b4.zip
Added stop() to the Broadcast subsystem
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala11
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala2
-rw-r--r--core/src/main/scala/spark/broadcast/BroadcastFactory.scala1
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala11
-rw-r--r--core/src/main/scala/spark/broadcast/MultiTracker.scala16
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala9
6 files changed, 27 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 2932434504..c00b89a975 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -61,7 +61,6 @@ extends Broadcast[T] with Logging with Serializable {
var variableInfo = MultiTracker.blockifyObject(value_)
// Prepare the value being broadcasted
- // TODO: Refactoring and clean-up required here
arrayOfBlocks = variableInfo.arrayOfBlocks
totalBytes = variableInfo.totalBytes
totalBlocks = variableInfo.totalBlocks
@@ -1061,11 +1060,7 @@ extends Broadcast[T] with Logging with Serializable {
class BitTorrentBroadcastFactory
extends BroadcastFactory {
- def initialize(isMaster: Boolean) {
- MultiTracker.initialize(isMaster)
- }
-
- def newBroadcast[T](value_ : T, isLocal: Boolean) = {
- new BitTorrentBroadcast[T](value_, isLocal)
- }
+ def initialize(isMaster: Boolean) = MultiTracker.initialize(isMaster)
+ def newBroadcast[T](value_ : T, isLocal: Boolean) = new BitTorrentBroadcast[T](value_, isLocal)
+ def stop() = MultiTracker.stop
}
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index 3126715d6b..135bc31b72 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -56,7 +56,7 @@ object Broadcast extends Logging with Serializable {
}
def stop() {
- // TODO:
+ broadcastFactory.stop()
}
def getBroadcastFactory: BroadcastFactory = {
diff --git a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
index b18908f789..e341d556bf 100644
--- a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
@@ -9,4 +9,5 @@ package spark.broadcast
trait BroadcastFactory {
def initialize(isMaster: Boolean): Unit
def newBroadcast[T](value_ : T, isLocal: Boolean): Broadcast[T]
+ def stop(): Unit
}
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index 81ea6d7451..ec8749c4a5 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -44,10 +44,9 @@ extends Broadcast[T] with Logging with Serializable {
}
class HttpBroadcastFactory extends BroadcastFactory {
- def initialize(isMaster: Boolean) {
- HttpBroadcast.initialize(isMaster)
- }
+ def initialize(isMaster: Boolean) = HttpBroadcast.initialize(isMaster)
def newBroadcast[T](value_ : T, isLocal: Boolean) = new HttpBroadcast[T](value_, isLocal)
+ def stop() = HttpBroadcast.stop()
}
private object HttpBroadcast extends Logging {
@@ -72,6 +71,12 @@ private object HttpBroadcast extends Logging {
}
}
}
+
+ def stop() {
+ if (server != null) {
+ server.stop()
+ }
+ }
private def createServer() {
broadcastDir = Utils.createTempDir()
diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala
index 7cd0cb6cf1..d5daea9546 100644
--- a/core/src/main/scala/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala
@@ -27,6 +27,8 @@ extends Logging {
private var initialized = false
private var isMaster_ = false
+ private var stopBroadcast = false
+
private var trackMV: TrackMultipleValues = null
def initialize(isMaster__ : Boolean) {
@@ -45,6 +47,10 @@ extends Logging {
}
}
}
+
+ def stop() {
+ stopBroadcast = true
+ }
// Load common parameters
private var MasterTrackerPort_ = System.getProperty(
@@ -122,14 +128,16 @@ extends Logging {
logInfo("TrackMultipleValues" + serverSocket)
try {
- while (true) {
+ while (!stopBroadcast) {
var clientSocket: Socket = null
try {
serverSocket.setSoTimeout(TrackerSocketTimeout)
clientSocket = serverSocket.accept()
} catch {
case e: Exception => {
- logInfo("TrackMultipleValues Timeout. Stopping listening...")
+ if (stopBroadcast) {
+ logInfo("Stopping TrackMultipleValues...")
+ }
}
}
@@ -207,9 +215,7 @@ extends Logging {
})
} catch {
// In failure, close socket here; else, client thread will close
- case ioe: IOException => {
- clientSocket.close()
- }
+ case ioe: IOException => clientSocket.close()
}
}
}
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index d5c7752c0e..5c03f5abf4 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -51,7 +51,6 @@ extends Broadcast[T] with Logging with Serializable {
var variableInfo = MultiTracker.blockifyObject(value_)
// Prepare the value being broadcasted
- // TODO: Refactoring and clean-up required here
arrayOfBlocks = variableInfo.arrayOfBlocks
totalBytes = variableInfo.totalBytes
totalBlocks = variableInfo.totalBlocks
@@ -618,9 +617,7 @@ extends Broadcast[T] with Logging with Serializable {
class TreeBroadcastFactory
extends BroadcastFactory {
- def initialize(isMaster: Boolean) {
- MultiTracker.initialize(isMaster)
- }
- def newBroadcast[T](value_ : T, isLocal: Boolean) =
- new TreeBroadcast[T](value_, isLocal)
+ def initialize(isMaster: Boolean) = MultiTracker.initialize(isMaster)
+ def newBroadcast[T](value_ : T, isLocal: Boolean) = new TreeBroadcast[T](value_, isLocal)
+ def stop() = MultiTracker.stop
}