aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-26 12:56:27 -0700
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-26 12:56:27 -0700
commit3f96c9b7e8275d21d0f323c6218262e049fff504 (patch)
treec7dfd93c30ae39e3c48e0e40e3c6ae5c6b9b98aa
parente659efca88d243c95b99596ae29735967c81f99f (diff)
downloadspark-3f96c9b7e8275d21d0f323c6218262e049fff504.tar.gz
spark-3f96c9b7e8275d21d0f323c6218262e049fff504.tar.bz2
spark-3f96c9b7e8275d21d0f323c6218262e049fff504.zip
Resolved some bugs. Apparently, objects deep inside other objects could be passed as references. Bad Scala!
-rw-r--r--src/scala/spark/Broadcast.scala87
1 files changed, 58 insertions, 29 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 47c3b8fc8d..48a2b5cb43 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -411,15 +411,22 @@ extends BroadcastRecipe with Logging {
while (hasBlocks < totalBlocks) {
var numThreadsToCreate = Math.min (listOfSources.size, MAX_PEERS) -
threadPool.getActiveCount
+
while(numThreadsToCreate > 0 && hasBlocks < totalBlocks) {
var peerToTalkTo = pickPeerToTalkTo
if (peerToTalkTo != null) {
threadPool.execute (new TalkToPeer (peerToTalkTo))
}
+
+ // Add to peersNowTalking. Remove in the thread. We have to do this
+ // ASAP, otherwise pickPeerToTalkTo picks the same peer more than once
+ peersNowTalking.synchronized {
+ peersNowTalking = peersNowTalking + peerToTalkTo
+ }
+
numThreadsToCreate = numThreadsToCreate - 1
- // Sleep for a while before starting some more threads
- Thread.sleep(500)
}
+
// Sleep for a while before starting some more threads
Thread.sleep (500)
}
@@ -477,13 +484,9 @@ extends BroadcastRecipe with Logging {
oisSource =
new ObjectInputStream (peerSocketToSource.getInputStream)
- // Add to peersNowTalking
- peersNowTalking.synchronized {
- peersNowTalking = peersNowTalking + peerToTalkTo
- }
-
// TODO: Who decides which blocks to move back and forth?
// TODO: Should we transfer multiple instead of just one?
+
// Send latest SourceInfo
oosSource.writeObject(getLocalSourceInfo)
oosSource.flush
@@ -497,21 +500,25 @@ extends BroadcastRecipe with Logging {
listOfSources = listOfSources + newPeerToTalkTo
}
- val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
- arrayOfBlocks(bcBlock.blockID) = bcBlock
- hasBlocksBitVector.synchronized {
- hasBlocksBitVector.set (bcBlock.blockID)
- }
- hasBlocks += 1
- hasBlocksLock.synchronized {
- hasBlocksLock.notifyAll
+ // TODO: There is a problem with closing this way
+ while (hasBlocks < totalBlocks) {
+ val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
+ arrayOfBlocks(bcBlock.blockID) = bcBlock
+ hasBlocksBitVector.synchronized {
+ hasBlocksBitVector.set (bcBlock.blockID)
+ }
+ hasBlocks += 1
+ hasBlocksLock.synchronized {
+ hasBlocksLock.notifyAll
+ }
+ logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock)
}
- logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock)
} catch {
case e: Exception => {
- // TODO: Right now assuming an exception == the other end is dead
+ logInfo ("TalktoPeer had a " + e)
// Remove this pInfo from listOfSources
- // TODO: We probably should have the following in some form
+ // TODO: We probably should have the following in some form, but not
+ // really here. This exception can happen if the sender just breaks connection
// listOfSources.synchronized {
// logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo)
// listOfSources = listOfSources - peerToTalkTo
@@ -679,22 +686,25 @@ extends BroadcastRecipe with Logging {
}
}
} finally {
- serverSocket.close
+ if (serverSocket != null) { serverSocket.close }
}
}
class ServeSingleRequest (val clientSocket: Socket)
extends Thread with Logging {
+ // TODO: This has to be fixed somehow
+ private val MAX_MILLIS_TO_CHAT = 50
+
private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
oos.flush
private val ois = new ObjectInputStream (clientSocket.getInputStream)
-
+
logInfo ("new ServeSingleRequest is running")
override def run = {
try {
// Receive latest SourceInfo from the receiver
- var rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo]
+ val rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo]
logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector)
@@ -709,12 +719,20 @@ extends BroadcastRecipe with Logging {
oos.writeObject(getLocalSourceInfo)
oos.flush
- // TODO: Right now sending a single block. Should reuse connection.
- pickAndSendBlock (rxSourceInfo.hasBlocksBitVector)
+ // TODO: NOT the most efficient way to do time-based break; but using timer can cause a break in the middle :-S
+ val startTime = System.currentTimeMillis
+ var curTime = startTime
+ var keepSending = true
+
+ while (keepSending && curTime - startTime < MAX_MILLIS_TO_CHAT) {
+ keepSending = pickAndSendBlock (rxSourceInfo.hasBlocksBitVector)
+ curTime = System.currentTimeMillis
+ }
} catch {
// TODO: Need to add better exception handling here
// If something went wrong, e.g., the worker at the other end died etc.
// then close everything up
+ // Exception can happen if the receiver stops receiving
case e: Exception => {
logInfo ("ServeSingleRequest had a " + e)
}
@@ -733,17 +751,19 @@ extends BroadcastRecipe with Logging {
var nextIndex = -1
logInfo ("Picking a block to send...")
-
+
// Figure out which blocks the receiver doesn't have
- rxHasBlocksBitVector.flip (0, rxHasBlocksBitVector.size)
+ var tempHasBlocksBitVector =
+ rxHasBlocksBitVector.clone.asInstanceOf[BitSet]
+ tempHasBlocksBitVector.flip (0, tempHasBlocksBitVector.size)
hasBlocksBitVector.synchronized {
- rxHasBlocksBitVector.and (hasBlocksBitVector)
+ tempHasBlocksBitVector.and (hasBlocksBitVector)
}
// Traverse over all the blocks
numCopiesSent.synchronized {
do {
- nextIndex = rxHasBlocksBitVector.nextSetBit(nextIndex + 1)
+ nextIndex = tempHasBlocksBitVector.nextSetBit(nextIndex + 1)
if (nextIndex != -1 && numCopiesSent(nextIndex) < minCopies) {
minCopies = numCopiesSent(nextIndex)
numCopiesSent(nextIndex) = numCopiesSent(nextIndex) + 1
@@ -752,14 +772,23 @@ extends BroadcastRecipe with Logging {
} while (nextIndex != -1)
}
- if (blockIndex == -1) { return false }
+ if (blockIndex == -1) {
+ logInfo ("No block to send...")
+ return false
+ }
try {
oos.writeObject (arrayOfBlocks(blockIndex))
oos.flush
- } catch { case e: Exception => { } }
+
+ // Update local copy to avoid duplication
+ rxHasBlocksBitVector.set (blockIndex)
+ } catch {
+ case e: Exception => { }
+ }
logInfo ("Sent block: " + blockIndex + " " + arrayOfBlocks(blockIndex))
+
return true
}
}