diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-26 12:56:27 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-26 12:56:27 -0700 |
commit | 3f96c9b7e8275d21d0f323c6218262e049fff504 (patch) | |
tree | c7dfd93c30ae39e3c48e0e40e3c6ae5c6b9b98aa | |
parent | e659efca88d243c95b99596ae29735967c81f99f (diff) | |
download | spark-3f96c9b7e8275d21d0f323c6218262e049fff504.tar.gz spark-3f96c9b7e8275d21d0f323c6218262e049fff504.tar.bz2 spark-3f96c9b7e8275d21d0f323c6218262e049fff504.zip |
Resolved some bugs. Apparently, objects deep inside other objects could be passed as references. Bad Scala!
-rw-r--r-- | src/scala/spark/Broadcast.scala | 87 |
1 files changed, 58 insertions, 29 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 47c3b8fc8d..48a2b5cb43 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -411,15 +411,22 @@ extends BroadcastRecipe with Logging { while (hasBlocks < totalBlocks) { var numThreadsToCreate = Math.min (listOfSources.size, MAX_PEERS) - threadPool.getActiveCount + while(numThreadsToCreate > 0 && hasBlocks < totalBlocks) { var peerToTalkTo = pickPeerToTalkTo if (peerToTalkTo != null) { threadPool.execute (new TalkToPeer (peerToTalkTo)) } + + // Add to peersNowTalking. Remove in the thread. We have to do this + // ASAP, otherwise pickPeerToTalkTo picks the same peer more than once + peersNowTalking.synchronized { + peersNowTalking = peersNowTalking + peerToTalkTo + } + numThreadsToCreate = numThreadsToCreate - 1 - // Sleep for a while before starting some more threads - Thread.sleep(500) } + // Sleep for a while before starting some more threads Thread.sleep (500) } @@ -477,13 +484,9 @@ extends BroadcastRecipe with Logging { oisSource = new ObjectInputStream (peerSocketToSource.getInputStream) - // Add to peersNowTalking - peersNowTalking.synchronized { - peersNowTalking = peersNowTalking + peerToTalkTo - } - // TODO: Who decides which blocks to move back and forth? // TODO: Should we transfer multiple instead of just one? + // Send latest SourceInfo oosSource.writeObject(getLocalSourceInfo) oosSource.flush @@ -497,21 +500,25 @@ extends BroadcastRecipe with Logging { listOfSources = listOfSources + newPeerToTalkTo } - val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] - arrayOfBlocks(bcBlock.blockID) = bcBlock - hasBlocksBitVector.synchronized { - hasBlocksBitVector.set (bcBlock.blockID) - } - hasBlocks += 1 - hasBlocksLock.synchronized { - hasBlocksLock.notifyAll + // TODO: There is a problem with closing this way + while (hasBlocks < totalBlocks) { + val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] + arrayOfBlocks(bcBlock.blockID) = bcBlock + hasBlocksBitVector.synchronized { + hasBlocksBitVector.set (bcBlock.blockID) + } + hasBlocks += 1 + hasBlocksLock.synchronized { + hasBlocksLock.notifyAll + } + logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock) } - logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock) } catch { case e: Exception => { - // TODO: Right now assuming an exception == the other end is dead + logInfo ("TalktoPeer had a " + e) // Remove this pInfo from listOfSources - // TODO: We probably should have the following in some form + // TODO: We probably should have the following in some form, but not + // really here. This exception can happen if the sender just breaks connection // listOfSources.synchronized { // logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo) // listOfSources = listOfSources - peerToTalkTo @@ -679,22 +686,25 @@ extends BroadcastRecipe with Logging { } } } finally { - serverSocket.close + if (serverSocket != null) { serverSocket.close } } } class ServeSingleRequest (val clientSocket: Socket) extends Thread with Logging { + // TODO: This has to be fixed somehow + private val MAX_MILLIS_TO_CHAT = 50 + private val oos = new ObjectOutputStream (clientSocket.getOutputStream) oos.flush private val ois = new ObjectInputStream (clientSocket.getInputStream) - + logInfo ("new ServeSingleRequest is running") override def run = { try { // Receive latest SourceInfo from the receiver - var rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo] + val rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo] logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector) @@ -709,12 +719,20 @@ extends BroadcastRecipe with Logging { oos.writeObject(getLocalSourceInfo) oos.flush - // TODO: Right now sending a single block. Should reuse connection. - pickAndSendBlock (rxSourceInfo.hasBlocksBitVector) + // TODO: NOT the most efficient way to do time-based break; but using timer can cause a break in the middle :-S + val startTime = System.currentTimeMillis + var curTime = startTime + var keepSending = true + + while (keepSending && curTime - startTime < MAX_MILLIS_TO_CHAT) { + keepSending = pickAndSendBlock (rxSourceInfo.hasBlocksBitVector) + curTime = System.currentTimeMillis + } } catch { // TODO: Need to add better exception handling here // If something went wrong, e.g., the worker at the other end died etc. // then close everything up + // Exception can happen if the receiver stops receiving case e: Exception => { logInfo ("ServeSingleRequest had a " + e) } @@ -733,17 +751,19 @@ extends BroadcastRecipe with Logging { var nextIndex = -1 logInfo ("Picking a block to send...") - + // Figure out which blocks the receiver doesn't have - rxHasBlocksBitVector.flip (0, rxHasBlocksBitVector.size) + var tempHasBlocksBitVector = + rxHasBlocksBitVector.clone.asInstanceOf[BitSet] + tempHasBlocksBitVector.flip (0, tempHasBlocksBitVector.size) hasBlocksBitVector.synchronized { - rxHasBlocksBitVector.and (hasBlocksBitVector) + tempHasBlocksBitVector.and (hasBlocksBitVector) } // Traverse over all the blocks numCopiesSent.synchronized { do { - nextIndex = rxHasBlocksBitVector.nextSetBit(nextIndex + 1) + nextIndex = tempHasBlocksBitVector.nextSetBit(nextIndex + 1) if (nextIndex != -1 && numCopiesSent(nextIndex) < minCopies) { minCopies = numCopiesSent(nextIndex) numCopiesSent(nextIndex) = numCopiesSent(nextIndex) + 1 @@ -752,14 +772,23 @@ extends BroadcastRecipe with Logging { } while (nextIndex != -1) } - if (blockIndex == -1) { return false } + if (blockIndex == -1) { + logInfo ("No block to send...") + return false + } try { oos.writeObject (arrayOfBlocks(blockIndex)) oos.flush - } catch { case e: Exception => { } } + + // Update local copy to avoid duplication + rxHasBlocksBitVector.set (blockIndex) + } catch { + case e: Exception => { } + } logInfo ("Sent block: " + blockIndex + " " + arrayOfBlocks(blockIndex)) + return true } } |