diff options
author | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2011-04-01 19:31:28 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2011-04-01 19:31:28 -0700 |
commit | 5bf3c83b13b0ef863330f3c88a2c910e665cb2bb (patch) | |
tree | 260fe422174080803690bc9268692cb7f9ca7c4f /core | |
parent | 733a1301081cab6692612727a93a1386912879a3 (diff) | |
download | spark-5bf3c83b13b0ef863330f3c88a2c910e665cb2bb.tar.gz spark-5bf3c83b13b0ef863330f3c88a2c910e665cb2bb.tar.bz2 spark-5bf3c83b13b0ef863330f3c88a2c910e665cb2bb.zip |
BroadcastSuperTracker (right now for BT) is contacted over TCP instead
of direct procedure call.
Need to do the same for others and consolidate all broadcast mechanisms.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/BitTorrentBroadcast.scala | 179 | ||||
-rw-r--r-- | core/src/main/scala/spark/Broadcast.scala | 45 | ||||
-rw-r--r-- | core/src/main/scala/spark/TreeBroadcast.scala | 6 |
3 files changed, 167 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/BitTorrentBroadcast.scala b/core/src/main/scala/spark/BitTorrentBroadcast.scala index 5a895b96b1..78b6273e1d 100644 --- a/core/src/main/scala/spark/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/BitTorrentBroadcast.scala @@ -123,7 +123,7 @@ extends Broadcast[T] with Logging { listOfSources += masterSource // Register with the Tracker - BitTorrentBroadcast.registerValue(uuid, + registerBroadcast(uuid, SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes, blockSize)) } @@ -194,6 +194,58 @@ extends Broadcast[T] with Logging { stopBroadcast = false } + private def registerBroadcast(uuid: UUID, gInfo: SourceInfo): Unit = { + val socket = new Socket(Broadcast.MasterHostAddress, + Broadcast.MasterTrackerPort) + val oosST = new ObjectOutputStream(socket.getOutputStream) + oosST.flush() + val oisST = new ObjectInputStream(socket.getInputStream) + + // Send messageType/intention + oosST.writeObject(Broadcast.REGISTER_BROADCAST_TRACKER) + oosST.flush() + + // Send UUID of this broadcast + oosST.writeObject(uuid) + oosST.flush() + + // Send this tracker's information + oosST.writeObject(gInfo) + oosST.flush() + + // Receive ACK and throw it away + oisST.readObject.asInstanceOf[Int] + + // Shut stuff down + oisST.close() + oosST.close() + socket.close() + } + + private def unregisterBroadcast(uuid: UUID): Unit = { + val socket = new Socket(Broadcast.MasterHostAddress, + Broadcast.MasterTrackerPort) + val oosST = new ObjectOutputStream(socket.getOutputStream) + oosST.flush() + val oisST = new ObjectInputStream(socket.getInputStream) + + // Send messageType/intention + oosST.writeObject(Broadcast.UNREGISTER_BROADCAST_TRACKER) + oosST.flush() + + // Send UUID of this broadcast + oosST.writeObject(uuid) + oosST.flush() + + // Receive ACK and throw it away + oisST.readObject.asInstanceOf[Int] + + // Shut stuff down + oisST.close() + oosST.close() + socket.close() + } + private def blockifyObject(obj: T): VariableInfo = { val baos = new ByteArrayOutputStream val oos = new ObjectOutputStream(baos) @@ -349,6 +401,10 @@ extends Broadcast[T] with Logging { oisTracker = new ObjectInputStream(clientSocketToTracker.getInputStream) + // Send messageType/intention + oosTracker.writeObject(Broadcast.FIND_BROADCAST_TRACKER) + oosTracker.flush() + // Send UUID and receive GuideInfo oosTracker.writeObject(uuid) oosTracker.flush() @@ -439,11 +495,11 @@ extends Broadcast[T] with Logging { override def run: Unit = { var threadPool = - Broadcast.newDaemonFixedThreadPool(BitTorrentBroadcast.MaxTxPeers) + Broadcast.newDaemonFixedThreadPool(Broadcast.MaxTxPeers) while (hasBlocks < totalBlocks) { var numThreadsToCreate = - math.min(listOfSources.size, BitTorrentBroadcast.MaxTxPeers) - + math.min(listOfSources.size, Broadcast.MaxTxPeers) - threadPool.getActiveCount while (hasBlocks < totalBlocks && numThreadsToCreate > 0) { @@ -735,7 +791,7 @@ extends Broadcast[T] with Logging { // Include blocks already in transmission ONLY IF // BitTorrentBroadcast.EndGameFraction has NOT been achieved - if ((1.0 * hasBlocks / totalBlocks) < BitTorrentBroadcast.EndGameFraction) { + if ((1.0 * hasBlocks / totalBlocks) < Broadcast.EndGameFraction) { blocksInRequestBitVector.synchronized { needBlocksBitVector.or(blocksInRequestBitVector) } @@ -775,7 +831,7 @@ extends Broadcast[T] with Logging { // Include blocks already in transmission ONLY IF // BitTorrentBroadcast.EndGameFraction has NOT been achieved - if ((1.0 * hasBlocks / totalBlocks) < BitTorrentBroadcast.EndGameFraction) { + if ((1.0 * hasBlocks / totalBlocks) < Broadcast.EndGameFraction) { blocksInRequestBitVector.synchronized { needBlocksBitVector.or(blocksInRequestBitVector) } @@ -904,7 +960,7 @@ extends Broadcast[T] with Logging { logInfo("Sending stopBroadcast notifications...") sendStopBroadcastNotifications - BitTorrentBroadcast.unregisterValue(uuid) + unregisterBroadcast(uuid) } finally { if (serverSocket != null) { logInfo("GuideMultipleRequests now stopping...") @@ -1009,10 +1065,10 @@ extends Broadcast[T] with Logging { } listOfSources.synchronized { - if (listOfSources.size <= BitTorrentBroadcast.MaxPeersInGuideResponse) { + if (listOfSources.size <= Broadcast.MaxPeersInGuideResponse) { selectedSources = listOfSources.clone } else { - var picksLeft = BitTorrentBroadcast.MaxPeersInGuideResponse + var picksLeft = Broadcast.MaxPeersInGuideResponse var alreadyPicked = new BitSet(listOfSources.size) while (picksLeft > 0) { @@ -1050,9 +1106,9 @@ extends Broadcast[T] with Logging { class ServeMultipleRequests extends Thread with Logging { - // Server at most BitTorrentBroadcast.MaxRxPeers peers + // Server at most Broadcast.MaxRxPeers peers var threadPool = - Broadcast.newDaemonFixedThreadPool(BitTorrentBroadcast.MaxRxPeers) + Broadcast.newDaemonFixedThreadPool(Broadcast.MaxRxPeers) override def run: Unit = { var serverSocket = new ServerSocket(0) @@ -1126,7 +1182,7 @@ extends Broadcast[T] with Logging { val startTime = System.currentTimeMillis var curTime = startTime var keepSending = true - var numBlocksToSend = BitTorrentBroadcast.MaxChatBlocks + var numBlocksToSend = Broadcast.MaxChatBlocks while (!stopBroadcast && keepSending && numBlocksToSend > 0) { // Receive which block to send @@ -1151,7 +1207,7 @@ extends Broadcast[T] with Logging { curTime = System.currentTimeMillis // Revoke sending only if there is anyone waiting in the queue - if (curTime - startTime >= BitTorrentBroadcast.MaxChatTime && + if (curTime - startTime >= Broadcast.MaxChatTime && threadPool.getQueue.size > 0) { keepSending = false } @@ -1227,21 +1283,6 @@ extends Logging { def initialize(isMaster__ : Boolean): Unit = { synchronized { if (!initialized) { - MaxPeersInGuideResponse_ = System.getProperty( - "spark.broadcast.maxPeersInGuideResponse", "4").toInt - - MaxRxPeers_ = System.getProperty( - "spark.broadcast.maxRxPeers", "4").toInt - MaxTxPeers_ = System.getProperty( - "spark.broadcast.maxTxPeers", "4").toInt - - MaxChatTime_ = System.getProperty( - "spark.broadcast.maxChatTime", "500").toInt - MaxChatBlocks_ = System.getProperty( - "spark.broadcast.maxChatBlocks", "1024").toInt - - EndGameFraction_ = System.getProperty( - "spark.broadcast.endGameFraction", "0.95").toDouble isMaster_ = isMaster__ @@ -1265,30 +1306,6 @@ extends Logging { def isMaster = isMaster_ - def MaxPeersInGuideResponse = MaxPeersInGuideResponse_ - - def MaxRxPeers = MaxRxPeers_ - def MaxTxPeers = MaxTxPeers_ - - def MaxChatTime = MaxChatTime_ - def MaxChatBlocks = MaxChatBlocks_ - - def EndGameFraction = EndGameFraction_ - - def registerValue(uuid: UUID, gInfo: SourceInfo): Unit = { - valueToGuideMap.synchronized { - valueToGuideMap += (uuid -> gInfo) - logInfo("New value registered with the Tracker " + valueToGuideMap) - } - } - - def unregisterValue(uuid: UUID): Unit = { - valueToGuideMap.synchronized { - valueToGuideMap(uuid) = SourceInfo("", SourceInfo.TxOverGoToHDFS) - logInfo("Value unregistered from the Tracker " + valueToGuideMap) - } - } - class TrackMultipleValues extends Thread with Logging { override def run: Unit = { @@ -1317,14 +1334,60 @@ extends Logging { val oos = new ObjectOutputStream(clientSocket.getOutputStream) oos.flush() val ois = new ObjectInputStream(clientSocket.getInputStream) + try { - val uuid = ois.readObject.asInstanceOf[UUID] - var gInfo = - if (valueToGuideMap.contains(uuid)) { - valueToGuideMap(uuid) - } else SourceInfo("", SourceInfo.TxNotStartedRetry) - logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort) - oos.writeObject(gInfo) + // First, read message type + val messageType = ois.readObject.asInstanceOf[Int] + + if (messageType == Broadcast.REGISTER_BROADCAST_TRACKER) { + // Receive UUID + val uuid = ois.readObject.asInstanceOf[UUID] + // Receive hostAddress and listenPort + val gInfo = ois.readObject.asInstanceOf[SourceInfo] + + // Add to the map + valueToGuideMap.synchronized { + valueToGuideMap += (uuid -> gInfo) + } + + logInfo ("New broadcast registered with TrackMultipleValues " + uuid + " " + valueToGuideMap) + + // Send dummy ACK + oos.writeObject(-1) + oos.flush() + } else if (messageType == Broadcast.UNREGISTER_BROADCAST_TRACKER) { + // Receive UUID + val uuid = ois.readObject.asInstanceOf[UUID] + + // Remove from the map + valueToGuideMap.synchronized { + valueToGuideMap(uuid) = SourceInfo("", SourceInfo.TxOverGoToHDFS) + logInfo("Value unregistered from the Tracker " + valueToGuideMap) + } + + logInfo ("Broadcast unregistered from TrackMultipleValues " + uuid + " " + valueToGuideMap) + + // Send dummy ACK + oos.writeObject(-1) + oos.flush() + } else if (messageType == Broadcast.FIND_BROADCAST_TRACKER) { + // Receive UUID + val uuid = ois.readObject.asInstanceOf[UUID] + + var gInfo = + if (valueToGuideMap.contains(uuid)) valueToGuideMap(uuid) + else SourceInfo("", SourceInfo.TxNotStartedRetry) + + logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort) + + // Send reply back + oos.writeObject(gInfo) + oos.flush() + } else if (messageType == Broadcast.GET_UPDATED_SHARE) { + // TODO: Not implemented + } else { + throw new SparkException("Undefined messageType at TrackMultipleValues") + } } catch { case e: Exception => { logInfo("TrackMultipleValues had a " + e) diff --git a/core/src/main/scala/spark/Broadcast.scala b/core/src/main/scala/spark/Broadcast.scala index 2a7c306e3b..d77ec8229f 100644 --- a/core/src/main/scala/spark/Broadcast.scala +++ b/core/src/main/scala/spark/Broadcast.scala @@ -23,6 +23,12 @@ trait BroadcastFactory { private object Broadcast extends Logging { + // Messages + val REGISTER_BROADCAST_TRACKER = 0 + val UNREGISTER_BROADCAST_TRACKER = 1 + val FIND_BROADCAST_TRACKER = 2 + val GET_UPDATED_SHARE = 3 + private var initialized = false private var isMaster_ = false private var broadcastFactory: BroadcastFactory = null @@ -73,8 +79,31 @@ extends Logging { private var MaxKnockInterval_ = System.getProperty( "spark.broadcast.maxKnockInterval", "999").toInt + // Load ChainedBroadcast config params + + // Load TreeBroadcast config params + private var MaxDegree_ = System.getProperty("spark.broadcast.maxDegree", "2").toInt + + // Load BitTorrentBroadcast config params + private var MaxPeersInGuideResponse_ = System.getProperty( + "spark.broadcast.maxPeersInGuideResponse", "4").toInt + + private var MaxRxPeers_ = System.getProperty( + "spark.broadcast.maxRxPeers", "4").toInt + private var MaxTxPeers_ = System.getProperty( + "spark.broadcast.maxTxPeers", "4").toInt + + private var MaxChatTime_ = System.getProperty( + "spark.broadcast.maxChatTime", "500").toInt + private var MaxChatBlocks_ = System.getProperty( + "spark.broadcast.maxChatBlocks", "1024").toInt + + private var EndGameFraction_ = System.getProperty( + "spark.broadcast.endGameFraction", "0.95").toDouble + def isMaster = isMaster_ + // Common config params def MasterHostAddress = MasterHostAddress_ def MasterTrackerPort = MasterTrackerPort_ def BlockSize = BlockSize_ @@ -86,6 +115,22 @@ extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ + // ChainedBroadcast configs + + // TreeBroadcast configs + def MaxDegree = MaxDegree_ + + // BitTorrentBroadcast configs + def MaxPeersInGuideResponse = MaxPeersInGuideResponse_ + + def MaxRxPeers = MaxRxPeers_ + def MaxTxPeers = MaxTxPeers_ + + def MaxChatTime = MaxChatTime_ + def MaxChatBlocks = MaxChatBlocks_ + + def EndGameFraction = EndGameFraction_ + // Returns a standard ThreadFactory except all threads are daemons private def newDaemonThreadFactory: ThreadFactory = { new ThreadFactory { diff --git a/core/src/main/scala/spark/TreeBroadcast.scala b/core/src/main/scala/spark/TreeBroadcast.scala index 2f843ea9c7..955a262aae 100644 --- a/core/src/main/scala/spark/TreeBroadcast.scala +++ b/core/src/main/scala/spark/TreeBroadcast.scala @@ -599,7 +599,7 @@ extends Broadcast[T] with Logging { listOfSources.foreach { source => if (source != skipSourceInfo && - source.currentLeechers < TreeBroadcast.MaxDegree && + source.currentLeechers < Broadcast.MaxDegree && source.currentLeechers > maxLeechers) { selectedSource = source maxLeechers = source.currentLeechers @@ -754,8 +754,6 @@ extends Logging { def initialize(isMaster__ : Boolean): Unit = { synchronized { if (!initialized) { - MaxDegree_ = System.getProperty("spark.broadcast.maxDegree", "2").toInt - isMaster_ = isMaster__ if (isMaster) { @@ -777,8 +775,6 @@ extends Logging { def isMaster = isMaster_ - def MaxDegree = MaxDegree_ - def registerValue(uuid: UUID, guidePort: Int): Unit = { valueToGuidePortMap.synchronized { valueToGuidePortMap += (uuid -> guidePort) |