aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-13 01:04:01 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-07-13 01:04:01 -0700
commitbb4ee580fad53dc9c29b7abbfee6f7c13cd8a725 (patch)
tree4946981bf64813c751f736778943905dedb16329 /core
parent8ccffe21da8a65344109f63b3b4baf948028392f (diff)
downloadspark-bb4ee580fad53dc9c29b7abbfee6f7c13cd8a725.tar.gz
spark-bb4ee580fad53dc9c29b7abbfee6f7c13cd8a725.tar.bz2
spark-bb4ee580fad53dc9c29b7abbfee6f7c13cd8a725.zip
Cleaning BitTorrentBroadcast code...
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala65
-rw-r--r--core/src/main/scala/spark/broadcast/MultiTracker.scala11
2 files changed, 23 insertions, 53 deletions
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 0700a9267e..3466d663af 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -80,9 +80,7 @@ extends Broadcast[T] with Logging with Serializable {
// Must always come AFTER guideMR is created
while (guidePort == -1) {
- guidePortLock.synchronized {
- guidePortLock.wait()
- }
+ guidePortLock.synchronized { guidePortLock.wait() }
}
serveMR = new ServeMultipleRequests
@@ -92,9 +90,7 @@ extends Broadcast[T] with Logging with Serializable {
// Must always come AFTER serveMR is created
while (listenPort == -1) {
- listenPortLock.synchronized {
- listenPortLock.wait()
- }
+ listenPortLock.synchronized { listenPortLock.wait() }
}
// Must always come AFTER listenPort is created
@@ -174,16 +170,12 @@ extends Broadcast[T] with Logging with Serializable {
private def getLocalSourceInfo: SourceInfo = {
// Wait till hostName and listenPort are OK
while (listenPort == -1) {
- listenPortLock.synchronized {
- listenPortLock.wait()
- }
+ listenPortLock.synchronized { listenPortLock.wait() }
}
// Wait till totalBlocks and totalBytes are OK
while (totalBlocks == -1) {
- totalBlocksLock.synchronized {
- totalBlocksLock.wait()
- }
+ totalBlocksLock.synchronized { totalBlocksLock.wait() }
}
var localSourceInfo = SourceInfo(
@@ -199,7 +191,7 @@ extends Broadcast[T] with Logging with Serializable {
}
// Add new SourceInfo to the listOfSources. Update if it exists already.
- // TODO: Optimizing just by OR-ing the BitVectors was BAD for performance
+ // Optimizing just by OR-ing the BitVectors was BAD for performance
private def addToListOfSources(newSourceInfo: SourceInfo) {
listOfSources.synchronized {
if (listOfSources.contains(newSourceInfo)) {
@@ -269,9 +261,7 @@ extends Broadcast[T] with Logging with Serializable {
// Wait until hostAddress and listenPort are created by the
// ServeMultipleRequests thread
while (listenPort == -1) {
- listenPortLock.synchronized {
- listenPortLock.wait()
- }
+ listenPortLock.synchronized { listenPortLock.wait() }
}
// Setup initial states of variables
@@ -279,9 +269,7 @@ extends Broadcast[T] with Logging with Serializable {
arrayOfBlocks = new Array[BroadcastBlock](totalBlocks)
hasBlocksBitVector = new BitSet(totalBlocks)
numCopiesSent = new Array[Int](totalBlocks)
- totalBlocksLock.synchronized {
- totalBlocksLock.notifyAll()
- }
+ totalBlocksLock.synchronized { totalBlocksLock.notifyAll() }
totalBytes = gInfo.totalBytes
// Start ttGuide to periodically talk to the Guide
@@ -313,11 +301,11 @@ extends Broadcast[T] with Logging with Serializable {
private var blocksInRequestBitVector = new BitSet(totalBlocks)
override def run() {
- var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxRxSlots)
+ var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
while (hasBlocks.get < totalBlocks) {
var numThreadsToCreate =
- math.min(listOfSources.size, MultiTracker.MaxRxSlots) -
+ math.min(listOfSources.size, MultiTracker.MaxChatSlots) -
threadPool.getActiveCount
while (hasBlocks.get < totalBlocks && numThreadsToCreate > 0) {
@@ -333,9 +321,7 @@ extends Broadcast[T] with Logging with Serializable {
// Add to peersNowTalking. Remove in the thread. We have to do this
// ASAP, otherwise pickPeerToTalkTo picks the same peer more than once
- peersNowTalking.synchronized {
- peersNowTalking += peerToTalkTo
- }
+ peersNowTalking.synchronized { peersNowTalking += peerToTalkTo }
}
numThreadsToCreate = numThreadsToCreate - 1
@@ -379,8 +365,7 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- // TODO: Always pick randomly or randomly pick randomly?
- // Now always picking randomly
+ // Always picking randomly
if (curPeer == null && peersNotInUse.size > 0) {
// Pick uniformly the i'th required peer
var i = MultiTracker.ranGen.nextInt(peersNotInUse.size)
@@ -419,8 +404,8 @@ extends Broadcast[T] with Logging with Serializable {
}
}
- // TODO: A block is rare if there are at most 2 copies of that block
- // TODO: This CONSTANT could be a function of the neighborhood size
+ // A block is considered rare if there are at most 2 copies of that block
+ // This CONSTANT could be a function of the neighborhood size
var rareBlocksIndices = ListBuffer[Int]()
for (i <- 0 until totalBlocks) {
if (numCopiesPerBlock(i) > 0 && numCopiesPerBlock(i) <= 2) {
@@ -713,9 +698,7 @@ extends Broadcast[T] with Logging with Serializable {
}
// Delete from peersNowTalking
- peersNowTalking.synchronized {
- peersNowTalking = peersNowTalking - peerToTalkTo
- }
+ peersNowTalking.synchronized { peersNowTalking -= peerToTalkTo }
}
}
}
@@ -733,9 +716,7 @@ extends Broadcast[T] with Logging with Serializable {
guidePort = serverSocket.getLocalPort
logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort)
- guidePortLock.synchronized {
- guidePortLock.notifyAll()
- }
+ guidePortLock.synchronized { guidePortLock.notifyAll() }
try {
// Don't stop until there is a copy in HDFS
@@ -852,9 +833,7 @@ extends Broadcast[T] with Logging with Serializable {
case e: Exception => {
// Assuming exception caused by receiver failure: remove
if (listOfSources != null) {
- listOfSources.synchronized {
- listOfSources = listOfSources - sourceInfo
- }
+ listOfSources.synchronized { listOfSources -= sourceInfo }
}
}
} finally {
@@ -871,9 +850,7 @@ extends Broadcast[T] with Logging with Serializable {
// If skipSourceInfo.hasBlocksBitVector has all bits set to 'true'
// then add skipSourceInfo to setOfCompletedSources. Return blank.
if (skipSourceInfo.hasBlocks == totalBlocks) {
- setOfCompletedSources.synchronized {
- setOfCompletedSources += skipSourceInfo
- }
+ setOfCompletedSources.synchronized { setOfCompletedSources += skipSourceInfo }
return selectedSources
}
@@ -919,8 +896,8 @@ extends Broadcast[T] with Logging with Serializable {
class ServeMultipleRequests
extends Thread with Logging {
- // Server at most MultiTracker.MaxTxSlots peers
- var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxTxSlots)
+ // Server at most MultiTracker.MaxChatSlots peers
+ var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
override def run() {
var serverSocket = new ServerSocket(0)
@@ -928,9 +905,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("ServeMultipleRequests started with " + serverSocket)
- listenPortLock.synchronized {
- listenPortLock.notifyAll()
- }
+ listenPortLock.synchronized { listenPortLock.notifyAll() }
try {
while (!stopBroadcast) {
diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala
index 5e6de903e3..10b90526e8 100644
--- a/core/src/main/scala/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala
@@ -77,11 +77,8 @@ extends Logging {
private var MaxPeersInGuideResponse_ = System.getProperty(
"spark.broadcast.maxPeersInGuideResponse", "4").toInt
- private var MaxRxSlots_ = System.getProperty(
- "spark.broadcast.maxRxSlots", "4").toInt
- private var MaxTxSlots_ = System.getProperty(
- "spark.broadcast.maxTxSlots", "4").toInt
-
+ private var MaxChatSlots_ = System.getProperty(
+ "spark.broadcast.maxChatSlots", "4").toInt
private var MaxChatTime_ = System.getProperty(
"spark.broadcast.maxChatTime", "500").toInt
private var MaxChatBlocks_ = System.getProperty(
@@ -109,9 +106,7 @@ extends Logging {
// BitTorrentBroadcast configs
def MaxPeersInGuideResponse = MaxPeersInGuideResponse_
- def MaxRxSlots = MaxRxSlots_
- def MaxTxSlots = MaxTxSlots_
-
+ def MaxChatSlots = MaxChatSlots_
def MaxChatTime = MaxChatTime_
def MaxChatBlocks = MaxChatBlocks_