aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-01 19:31:28 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-01 19:31:28 -0700
commit5bf3c83b13b0ef863330f3c88a2c910e665cb2bb (patch)
tree260fe422174080803690bc9268692cb7f9ca7c4f /core
parent733a1301081cab6692612727a93a1386912879a3 (diff)
downloadspark-5bf3c83b13b0ef863330f3c88a2c910e665cb2bb.tar.gz
spark-5bf3c83b13b0ef863330f3c88a2c910e665cb2bb.tar.bz2
spark-5bf3c83b13b0ef863330f3c88a2c910e665cb2bb.zip
BroadcastSuperTracker (right now for BT) is contacted over TCP instead
of direct procedure call. Need to do the same for others and consolidate all broadcast mechanisms.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/BitTorrentBroadcast.scala179
-rw-r--r--core/src/main/scala/spark/Broadcast.scala45
-rw-r--r--core/src/main/scala/spark/TreeBroadcast.scala6
3 files changed, 167 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/BitTorrentBroadcast.scala b/core/src/main/scala/spark/BitTorrentBroadcast.scala
index 5a895b96b1..78b6273e1d 100644
--- a/core/src/main/scala/spark/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/BitTorrentBroadcast.scala
@@ -123,7 +123,7 @@ extends Broadcast[T] with Logging {
listOfSources += masterSource
// Register with the Tracker
- BitTorrentBroadcast.registerValue(uuid,
+ registerBroadcast(uuid,
SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes, blockSize))
}
@@ -194,6 +194,58 @@ extends Broadcast[T] with Logging {
stopBroadcast = false
}
+ private def registerBroadcast(uuid: UUID, gInfo: SourceInfo): Unit = {
+ val socket = new Socket(Broadcast.MasterHostAddress,
+ Broadcast.MasterTrackerPort)
+ val oosST = new ObjectOutputStream(socket.getOutputStream)
+ oosST.flush()
+ val oisST = new ObjectInputStream(socket.getInputStream)
+
+ // Send messageType/intention
+ oosST.writeObject(Broadcast.REGISTER_BROADCAST_TRACKER)
+ oosST.flush()
+
+ // Send UUID of this broadcast
+ oosST.writeObject(uuid)
+ oosST.flush()
+
+ // Send this tracker's information
+ oosST.writeObject(gInfo)
+ oosST.flush()
+
+ // Receive ACK and throw it away
+ oisST.readObject.asInstanceOf[Int]
+
+ // Shut stuff down
+ oisST.close()
+ oosST.close()
+ socket.close()
+ }
+
+ private def unregisterBroadcast(uuid: UUID): Unit = {
+ val socket = new Socket(Broadcast.MasterHostAddress,
+ Broadcast.MasterTrackerPort)
+ val oosST = new ObjectOutputStream(socket.getOutputStream)
+ oosST.flush()
+ val oisST = new ObjectInputStream(socket.getInputStream)
+
+ // Send messageType/intention
+ oosST.writeObject(Broadcast.UNREGISTER_BROADCAST_TRACKER)
+ oosST.flush()
+
+ // Send UUID of this broadcast
+ oosST.writeObject(uuid)
+ oosST.flush()
+
+ // Receive ACK and throw it away
+ oisST.readObject.asInstanceOf[Int]
+
+ // Shut stuff down
+ oisST.close()
+ oosST.close()
+ socket.close()
+ }
+
private def blockifyObject(obj: T): VariableInfo = {
val baos = new ByteArrayOutputStream
val oos = new ObjectOutputStream(baos)
@@ -349,6 +401,10 @@ extends Broadcast[T] with Logging {
oisTracker =
new ObjectInputStream(clientSocketToTracker.getInputStream)
+ // Send messageType/intention
+ oosTracker.writeObject(Broadcast.FIND_BROADCAST_TRACKER)
+ oosTracker.flush()
+
// Send UUID and receive GuideInfo
oosTracker.writeObject(uuid)
oosTracker.flush()
@@ -439,11 +495,11 @@ extends Broadcast[T] with Logging {
override def run: Unit = {
var threadPool =
- Broadcast.newDaemonFixedThreadPool(BitTorrentBroadcast.MaxTxPeers)
+ Broadcast.newDaemonFixedThreadPool(Broadcast.MaxTxPeers)
while (hasBlocks < totalBlocks) {
var numThreadsToCreate =
- math.min(listOfSources.size, BitTorrentBroadcast.MaxTxPeers) -
+ math.min(listOfSources.size, Broadcast.MaxTxPeers) -
threadPool.getActiveCount
while (hasBlocks < totalBlocks && numThreadsToCreate > 0) {
@@ -735,7 +791,7 @@ extends Broadcast[T] with Logging {
// Include blocks already in transmission ONLY IF
// BitTorrentBroadcast.EndGameFraction has NOT been achieved
- if ((1.0 * hasBlocks / totalBlocks) < BitTorrentBroadcast.EndGameFraction) {
+ if ((1.0 * hasBlocks / totalBlocks) < Broadcast.EndGameFraction) {
blocksInRequestBitVector.synchronized {
needBlocksBitVector.or(blocksInRequestBitVector)
}
@@ -775,7 +831,7 @@ extends Broadcast[T] with Logging {
// Include blocks already in transmission ONLY IF
// BitTorrentBroadcast.EndGameFraction has NOT been achieved
- if ((1.0 * hasBlocks / totalBlocks) < BitTorrentBroadcast.EndGameFraction) {
+ if ((1.0 * hasBlocks / totalBlocks) < Broadcast.EndGameFraction) {
blocksInRequestBitVector.synchronized {
needBlocksBitVector.or(blocksInRequestBitVector)
}
@@ -904,7 +960,7 @@ extends Broadcast[T] with Logging {
logInfo("Sending stopBroadcast notifications...")
sendStopBroadcastNotifications
- BitTorrentBroadcast.unregisterValue(uuid)
+ unregisterBroadcast(uuid)
} finally {
if (serverSocket != null) {
logInfo("GuideMultipleRequests now stopping...")
@@ -1009,10 +1065,10 @@ extends Broadcast[T] with Logging {
}
listOfSources.synchronized {
- if (listOfSources.size <= BitTorrentBroadcast.MaxPeersInGuideResponse) {
+ if (listOfSources.size <= Broadcast.MaxPeersInGuideResponse) {
selectedSources = listOfSources.clone
} else {
- var picksLeft = BitTorrentBroadcast.MaxPeersInGuideResponse
+ var picksLeft = Broadcast.MaxPeersInGuideResponse
var alreadyPicked = new BitSet(listOfSources.size)
while (picksLeft > 0) {
@@ -1050,9 +1106,9 @@ extends Broadcast[T] with Logging {
class ServeMultipleRequests
extends Thread with Logging {
- // Server at most BitTorrentBroadcast.MaxRxPeers peers
+ // Server at most Broadcast.MaxRxPeers peers
var threadPool =
- Broadcast.newDaemonFixedThreadPool(BitTorrentBroadcast.MaxRxPeers)
+ Broadcast.newDaemonFixedThreadPool(Broadcast.MaxRxPeers)
override def run: Unit = {
var serverSocket = new ServerSocket(0)
@@ -1126,7 +1182,7 @@ extends Broadcast[T] with Logging {
val startTime = System.currentTimeMillis
var curTime = startTime
var keepSending = true
- var numBlocksToSend = BitTorrentBroadcast.MaxChatBlocks
+ var numBlocksToSend = Broadcast.MaxChatBlocks
while (!stopBroadcast && keepSending && numBlocksToSend > 0) {
// Receive which block to send
@@ -1151,7 +1207,7 @@ extends Broadcast[T] with Logging {
curTime = System.currentTimeMillis
// Revoke sending only if there is anyone waiting in the queue
- if (curTime - startTime >= BitTorrentBroadcast.MaxChatTime &&
+ if (curTime - startTime >= Broadcast.MaxChatTime &&
threadPool.getQueue.size > 0) {
keepSending = false
}
@@ -1227,21 +1283,6 @@ extends Logging {
def initialize(isMaster__ : Boolean): Unit = {
synchronized {
if (!initialized) {
- MaxPeersInGuideResponse_ = System.getProperty(
- "spark.broadcast.maxPeersInGuideResponse", "4").toInt
-
- MaxRxPeers_ = System.getProperty(
- "spark.broadcast.maxRxPeers", "4").toInt
- MaxTxPeers_ = System.getProperty(
- "spark.broadcast.maxTxPeers", "4").toInt
-
- MaxChatTime_ = System.getProperty(
- "spark.broadcast.maxChatTime", "500").toInt
- MaxChatBlocks_ = System.getProperty(
- "spark.broadcast.maxChatBlocks", "1024").toInt
-
- EndGameFraction_ = System.getProperty(
- "spark.broadcast.endGameFraction", "0.95").toDouble
isMaster_ = isMaster__
@@ -1265,30 +1306,6 @@ extends Logging {
def isMaster = isMaster_
- def MaxPeersInGuideResponse = MaxPeersInGuideResponse_
-
- def MaxRxPeers = MaxRxPeers_
- def MaxTxPeers = MaxTxPeers_
-
- def MaxChatTime = MaxChatTime_
- def MaxChatBlocks = MaxChatBlocks_
-
- def EndGameFraction = EndGameFraction_
-
- def registerValue(uuid: UUID, gInfo: SourceInfo): Unit = {
- valueToGuideMap.synchronized {
- valueToGuideMap += (uuid -> gInfo)
- logInfo("New value registered with the Tracker " + valueToGuideMap)
- }
- }
-
- def unregisterValue(uuid: UUID): Unit = {
- valueToGuideMap.synchronized {
- valueToGuideMap(uuid) = SourceInfo("", SourceInfo.TxOverGoToHDFS)
- logInfo("Value unregistered from the Tracker " + valueToGuideMap)
- }
- }
-
class TrackMultipleValues
extends Thread with Logging {
override def run: Unit = {
@@ -1317,14 +1334,60 @@ extends Logging {
val oos = new ObjectOutputStream(clientSocket.getOutputStream)
oos.flush()
val ois = new ObjectInputStream(clientSocket.getInputStream)
+
try {
- val uuid = ois.readObject.asInstanceOf[UUID]
- var gInfo =
- if (valueToGuideMap.contains(uuid)) {
- valueToGuideMap(uuid)
- } else SourceInfo("", SourceInfo.TxNotStartedRetry)
- logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort)
- oos.writeObject(gInfo)
+ // First, read message type
+ val messageType = ois.readObject.asInstanceOf[Int]
+
+ if (messageType == Broadcast.REGISTER_BROADCAST_TRACKER) {
+ // Receive UUID
+ val uuid = ois.readObject.asInstanceOf[UUID]
+ // Receive hostAddress and listenPort
+ val gInfo = ois.readObject.asInstanceOf[SourceInfo]
+
+ // Add to the map
+ valueToGuideMap.synchronized {
+ valueToGuideMap += (uuid -> gInfo)
+ }
+
+ logInfo ("New broadcast registered with TrackMultipleValues " + uuid + " " + valueToGuideMap)
+
+ // Send dummy ACK
+ oos.writeObject(-1)
+ oos.flush()
+ } else if (messageType == Broadcast.UNREGISTER_BROADCAST_TRACKER) {
+ // Receive UUID
+ val uuid = ois.readObject.asInstanceOf[UUID]
+
+ // Remove from the map
+ valueToGuideMap.synchronized {
+ valueToGuideMap(uuid) = SourceInfo("", SourceInfo.TxOverGoToHDFS)
+ logInfo("Value unregistered from the Tracker " + valueToGuideMap)
+ }
+
+ logInfo ("Broadcast unregistered from TrackMultipleValues " + uuid + " " + valueToGuideMap)
+
+ // Send dummy ACK
+ oos.writeObject(-1)
+ oos.flush()
+ } else if (messageType == Broadcast.FIND_BROADCAST_TRACKER) {
+ // Receive UUID
+ val uuid = ois.readObject.asInstanceOf[UUID]
+
+ var gInfo =
+ if (valueToGuideMap.contains(uuid)) valueToGuideMap(uuid)
+ else SourceInfo("", SourceInfo.TxNotStartedRetry)
+
+ logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort)
+
+ // Send reply back
+ oos.writeObject(gInfo)
+ oos.flush()
+ } else if (messageType == Broadcast.GET_UPDATED_SHARE) {
+ // TODO: Not implemented
+ } else {
+ throw new SparkException("Undefined messageType at TrackMultipleValues")
+ }
} catch {
case e: Exception => {
logInfo("TrackMultipleValues had a " + e)
diff --git a/core/src/main/scala/spark/Broadcast.scala b/core/src/main/scala/spark/Broadcast.scala
index 2a7c306e3b..d77ec8229f 100644
--- a/core/src/main/scala/spark/Broadcast.scala
+++ b/core/src/main/scala/spark/Broadcast.scala
@@ -23,6 +23,12 @@ trait BroadcastFactory {
private object Broadcast
extends Logging {
+ // Messages
+ val REGISTER_BROADCAST_TRACKER = 0
+ val UNREGISTER_BROADCAST_TRACKER = 1
+ val FIND_BROADCAST_TRACKER = 2
+ val GET_UPDATED_SHARE = 3
+
private var initialized = false
private var isMaster_ = false
private var broadcastFactory: BroadcastFactory = null
@@ -73,8 +79,31 @@ extends Logging {
private var MaxKnockInterval_ = System.getProperty(
"spark.broadcast.maxKnockInterval", "999").toInt
+ // Load ChainedBroadcast config params
+
+ // Load TreeBroadcast config params
+ private var MaxDegree_ = System.getProperty("spark.broadcast.maxDegree", "2").toInt
+
+ // Load BitTorrentBroadcast config params
+ private var MaxPeersInGuideResponse_ = System.getProperty(
+ "spark.broadcast.maxPeersInGuideResponse", "4").toInt
+
+ private var MaxRxPeers_ = System.getProperty(
+ "spark.broadcast.maxRxPeers", "4").toInt
+ private var MaxTxPeers_ = System.getProperty(
+ "spark.broadcast.maxTxPeers", "4").toInt
+
+ private var MaxChatTime_ = System.getProperty(
+ "spark.broadcast.maxChatTime", "500").toInt
+ private var MaxChatBlocks_ = System.getProperty(
+ "spark.broadcast.maxChatBlocks", "1024").toInt
+
+ private var EndGameFraction_ = System.getProperty(
+ "spark.broadcast.endGameFraction", "0.95").toDouble
+
def isMaster = isMaster_
+ // Common config params
def MasterHostAddress = MasterHostAddress_
def MasterTrackerPort = MasterTrackerPort_
def BlockSize = BlockSize_
@@ -86,6 +115,22 @@ extends Logging {
def MinKnockInterval = MinKnockInterval_
def MaxKnockInterval = MaxKnockInterval_
+ // ChainedBroadcast configs
+
+ // TreeBroadcast configs
+ def MaxDegree = MaxDegree_
+
+ // BitTorrentBroadcast configs
+ def MaxPeersInGuideResponse = MaxPeersInGuideResponse_
+
+ def MaxRxPeers = MaxRxPeers_
+ def MaxTxPeers = MaxTxPeers_
+
+ def MaxChatTime = MaxChatTime_
+ def MaxChatBlocks = MaxChatBlocks_
+
+ def EndGameFraction = EndGameFraction_
+
// Returns a standard ThreadFactory except all threads are daemons
private def newDaemonThreadFactory: ThreadFactory = {
new ThreadFactory {
diff --git a/core/src/main/scala/spark/TreeBroadcast.scala b/core/src/main/scala/spark/TreeBroadcast.scala
index 2f843ea9c7..955a262aae 100644
--- a/core/src/main/scala/spark/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/TreeBroadcast.scala
@@ -599,7 +599,7 @@ extends Broadcast[T] with Logging {
listOfSources.foreach { source =>
if (source != skipSourceInfo &&
- source.currentLeechers < TreeBroadcast.MaxDegree &&
+ source.currentLeechers < Broadcast.MaxDegree &&
source.currentLeechers > maxLeechers) {
selectedSource = source
maxLeechers = source.currentLeechers
@@ -754,8 +754,6 @@ extends Logging {
def initialize(isMaster__ : Boolean): Unit = {
synchronized {
if (!initialized) {
- MaxDegree_ = System.getProperty("spark.broadcast.maxDegree", "2").toInt
-
isMaster_ = isMaster__
if (isMaster) {
@@ -777,8 +775,6 @@ extends Logging {
def isMaster = isMaster_
- def MaxDegree = MaxDegree_
-
def registerValue(uuid: UUID, guidePort: Int): Unit = {
valueToGuidePortMap.synchronized {
valueToGuidePortMap += (uuid -> guidePort)