diff options
author | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-13 01:04:01 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@cs.berkeley.edu> | 2012-07-13 01:04:01 -0700 |
commit | bb4ee580fad53dc9c29b7abbfee6f7c13cd8a725 (patch) | |
tree | 4946981bf64813c751f736778943905dedb16329 /core | |
parent | 8ccffe21da8a65344109f63b3b4baf948028392f (diff) | |
download | spark-bb4ee580fad53dc9c29b7abbfee6f7c13cd8a725.tar.gz spark-bb4ee580fad53dc9c29b7abbfee6f7c13cd8a725.tar.bz2 spark-bb4ee580fad53dc9c29b7abbfee6f7c13cd8a725.zip |
Cleaning BitTorrentBroadcast code...
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala | 65 | ||||
-rw-r--r-- | core/src/main/scala/spark/broadcast/MultiTracker.scala | 11 |
2 files changed, 23 insertions, 53 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 0700a9267e..3466d663af 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -80,9 +80,7 @@ extends Broadcast[T] with Logging with Serializable { // Must always come AFTER guideMR is created while (guidePort == -1) { - guidePortLock.synchronized { - guidePortLock.wait() - } + guidePortLock.synchronized { guidePortLock.wait() } } serveMR = new ServeMultipleRequests @@ -92,9 +90,7 @@ extends Broadcast[T] with Logging with Serializable { // Must always come AFTER serveMR is created while (listenPort == -1) { - listenPortLock.synchronized { - listenPortLock.wait() - } + listenPortLock.synchronized { listenPortLock.wait() } } // Must always come AFTER listenPort is created @@ -174,16 +170,12 @@ extends Broadcast[T] with Logging with Serializable { private def getLocalSourceInfo: SourceInfo = { // Wait till hostName and listenPort are OK while (listenPort == -1) { - listenPortLock.synchronized { - listenPortLock.wait() - } + listenPortLock.synchronized { listenPortLock.wait() } } // Wait till totalBlocks and totalBytes are OK while (totalBlocks == -1) { - totalBlocksLock.synchronized { - totalBlocksLock.wait() - } + totalBlocksLock.synchronized { totalBlocksLock.wait() } } var localSourceInfo = SourceInfo( @@ -199,7 +191,7 @@ extends Broadcast[T] with Logging with Serializable { } // Add new SourceInfo to the listOfSources. Update if it exists already. - // TODO: Optimizing just by OR-ing the BitVectors was BAD for performance + // Optimizing just by OR-ing the BitVectors was BAD for performance private def addToListOfSources(newSourceInfo: SourceInfo) { listOfSources.synchronized { if (listOfSources.contains(newSourceInfo)) { @@ -269,9 +261,7 @@ extends Broadcast[T] with Logging with Serializable { // Wait until hostAddress and listenPort are created by the // ServeMultipleRequests thread while (listenPort == -1) { - listenPortLock.synchronized { - listenPortLock.wait() - } + listenPortLock.synchronized { listenPortLock.wait() } } // Setup initial states of variables @@ -279,9 +269,7 @@ extends Broadcast[T] with Logging with Serializable { arrayOfBlocks = new Array[BroadcastBlock](totalBlocks) hasBlocksBitVector = new BitSet(totalBlocks) numCopiesSent = new Array[Int](totalBlocks) - totalBlocksLock.synchronized { - totalBlocksLock.notifyAll() - } + totalBlocksLock.synchronized { totalBlocksLock.notifyAll() } totalBytes = gInfo.totalBytes // Start ttGuide to periodically talk to the Guide @@ -313,11 +301,11 @@ extends Broadcast[T] with Logging with Serializable { private var blocksInRequestBitVector = new BitSet(totalBlocks) override def run() { - var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxRxSlots) + var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots) while (hasBlocks.get < totalBlocks) { var numThreadsToCreate = - math.min(listOfSources.size, MultiTracker.MaxRxSlots) - + math.min(listOfSources.size, MultiTracker.MaxChatSlots) - threadPool.getActiveCount while (hasBlocks.get < totalBlocks && numThreadsToCreate > 0) { @@ -333,9 +321,7 @@ extends Broadcast[T] with Logging with Serializable { // Add to peersNowTalking. Remove in the thread. We have to do this // ASAP, otherwise pickPeerToTalkTo picks the same peer more than once - peersNowTalking.synchronized { - peersNowTalking += peerToTalkTo - } + peersNowTalking.synchronized { peersNowTalking += peerToTalkTo } } numThreadsToCreate = numThreadsToCreate - 1 @@ -379,8 +365,7 @@ extends Broadcast[T] with Logging with Serializable { } } - // TODO: Always pick randomly or randomly pick randomly? - // Now always picking randomly + // Always picking randomly if (curPeer == null && peersNotInUse.size > 0) { // Pick uniformly the i'th required peer var i = MultiTracker.ranGen.nextInt(peersNotInUse.size) @@ -419,8 +404,8 @@ extends Broadcast[T] with Logging with Serializable { } } - // TODO: A block is rare if there are at most 2 copies of that block - // TODO: This CONSTANT could be a function of the neighborhood size + // A block is considered rare if there are at most 2 copies of that block + // This CONSTANT could be a function of the neighborhood size var rareBlocksIndices = ListBuffer[Int]() for (i <- 0 until totalBlocks) { if (numCopiesPerBlock(i) > 0 && numCopiesPerBlock(i) <= 2) { @@ -713,9 +698,7 @@ extends Broadcast[T] with Logging with Serializable { } // Delete from peersNowTalking - peersNowTalking.synchronized { - peersNowTalking = peersNowTalking - peerToTalkTo - } + peersNowTalking.synchronized { peersNowTalking -= peerToTalkTo } } } } @@ -733,9 +716,7 @@ extends Broadcast[T] with Logging with Serializable { guidePort = serverSocket.getLocalPort logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort) - guidePortLock.synchronized { - guidePortLock.notifyAll() - } + guidePortLock.synchronized { guidePortLock.notifyAll() } try { // Don't stop until there is a copy in HDFS @@ -852,9 +833,7 @@ extends Broadcast[T] with Logging with Serializable { case e: Exception => { // Assuming exception caused by receiver failure: remove if (listOfSources != null) { - listOfSources.synchronized { - listOfSources = listOfSources - sourceInfo - } + listOfSources.synchronized { listOfSources -= sourceInfo } } } } finally { @@ -871,9 +850,7 @@ extends Broadcast[T] with Logging with Serializable { // If skipSourceInfo.hasBlocksBitVector has all bits set to 'true' // then add skipSourceInfo to setOfCompletedSources. Return blank. if (skipSourceInfo.hasBlocks == totalBlocks) { - setOfCompletedSources.synchronized { - setOfCompletedSources += skipSourceInfo - } + setOfCompletedSources.synchronized { setOfCompletedSources += skipSourceInfo } return selectedSources } @@ -919,8 +896,8 @@ extends Broadcast[T] with Logging with Serializable { class ServeMultipleRequests extends Thread with Logging { - // Server at most MultiTracker.MaxTxSlots peers - var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxTxSlots) + // Server at most MultiTracker.MaxChatSlots peers + var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots) override def run() { var serverSocket = new ServerSocket(0) @@ -928,9 +905,7 @@ extends Broadcast[T] with Logging with Serializable { logInfo("ServeMultipleRequests started with " + serverSocket) - listenPortLock.synchronized { - listenPortLock.notifyAll() - } + listenPortLock.synchronized { listenPortLock.notifyAll() } try { while (!stopBroadcast) { diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala index 5e6de903e3..10b90526e8 100644 --- a/core/src/main/scala/spark/broadcast/MultiTracker.scala +++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala @@ -77,11 +77,8 @@ extends Logging { private var MaxPeersInGuideResponse_ = System.getProperty( "spark.broadcast.maxPeersInGuideResponse", "4").toInt - private var MaxRxSlots_ = System.getProperty( - "spark.broadcast.maxRxSlots", "4").toInt - private var MaxTxSlots_ = System.getProperty( - "spark.broadcast.maxTxSlots", "4").toInt - + private var MaxChatSlots_ = System.getProperty( + "spark.broadcast.maxChatSlots", "4").toInt private var MaxChatTime_ = System.getProperty( "spark.broadcast.maxChatTime", "500").toInt private var MaxChatBlocks_ = System.getProperty( @@ -109,9 +106,7 @@ extends Logging { // BitTorrentBroadcast configs def MaxPeersInGuideResponse = MaxPeersInGuideResponse_ - def MaxRxSlots = MaxRxSlots_ - def MaxTxSlots = MaxTxSlots_ - + def MaxChatSlots = MaxChatSlots_ def MaxChatTime = MaxChatTime_ def MaxChatBlocks = MaxChatBlocks_ |