diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-27 15:36:58 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-27 15:36:58 -0700 |
commit | e4dc7847ba7ce229c8a30d18579bb283f013879f (patch) | |
tree | 478ec72fd7e057a9fc66b80edc0135032dbff753 | |
parent | 059b6ae4754f61bac457224a03b982ed54a969b1 (diff) | |
download | spark-e4dc7847ba7ce229c8a30d18579bb283f013879f.tar.gz spark-e4dc7847ba7ce229c8a30d18579bb283f013879f.tar.bz2 spark-e4dc7847ba7ce229c8a30d18579bb283f013879f.zip |
- Receivers now send back the latest hasBlocksBitVector after every reception
- Will probably settle for the push model. Still not removing the pull related codes
- Refactoring...
-rw-r--r-- | src/scala/spark/Broadcast.scala | 57 |
1 files changed, 32 insertions, 25 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index dd4e4cadce..94d0fed8b0 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -272,6 +272,16 @@ extends BroadcastRecipe with Logging { return localSourceInfo } + // Add new SourceInfo to the listOfSources. Update if it exists already. + private def addToListOfSources (newSourceInfo: SourceInfo) = { + listOfSources.synchronized { + if (listOfSources.contains(newSourceInfo)) { + listOfSources = listOfSources - newSourceInfo + } + listOfSources = listOfSources + newSourceInfo + } + } + class TalkToGuide (gInfo: SourceInfo) extends Thread with Logging { override def run = { @@ -517,21 +527,16 @@ extends BroadcastRecipe with Logging { oosSource.flush oisSource = new ObjectInputStream (peerSocketToSource.getInputStream) - + // Receive latest SourceInfo from peerToTalkTo var newPeerToTalkTo = oisSource.readObject.asInstanceOf[SourceInfo] // Update listOfSources - listOfSources.synchronized { - if (listOfSources.contains(newPeerToTalkTo)) { - listOfSources = listOfSources - newPeerToTalkTo - } - listOfSources = listOfSources + newPeerToTalkTo - } + addToListOfSources (newPeerToTalkTo) // Turn the timer OFF, if the sender responds before timeout timeOutTimer.cancel - // Send latest SourceInfo + // Send the latest SourceInfo oosSource.writeObject(getLocalSourceInfo) oosSource.flush @@ -549,6 +554,10 @@ extends BroadcastRecipe with Logging { hasBlocks += 1 logInfo ("Received block: " + bcBlock.blockID + " from " + peerToTalkTo) } + + // Send the latest SourceInfo + oosSource.writeObject(getLocalSourceInfo) + oosSource.flush } } catch { case e: Exception => { @@ -566,6 +575,11 @@ extends BroadcastRecipe with Logging { } } + // Pick the first one + private def pickAndRequestBlock = { + // TODO: Will implement it later. + } + private def cleanUpConnections = { if (oisSource != null) { oisSource.close @@ -653,13 +667,8 @@ extends BroadcastRecipe with Logging { oos.writeObject (selectedSources) oos.flush - listOfSources.synchronized { - // Add this source to the listOfSources - if (listOfSources.contains(sourceInfo)) { - listOfSources = listOfSources - sourceInfo - } - listOfSources = listOfSources + sourceInfo - } + // Add this source to the listOfSources + addToListOfSources (sourceInfo) } catch { case e: Exception => { // Assuming exception caused by receiver failure: remove @@ -765,17 +774,9 @@ extends BroadcastRecipe with Logging { oos.flush // Receive latest SourceInfo from the receiver - val rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo] - + var rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo] logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector) - - // Update listOfSources - listOfSources.synchronized { - if (listOfSources.contains(rxSourceInfo)) { - listOfSources = listOfSources - rxSourceInfo - } - listOfSources = listOfSources + rxSourceInfo - } + addToListOfSources (rxSourceInfo) // TODO: NOT the most efficient way to do time-based break; // but using timer can cause a break in the middle :-S @@ -793,6 +794,12 @@ extends BroadcastRecipe with Logging { rxSourceInfo.hasBlocksBitVector.set (sentBlock) } blocksToSend = blocksToSend - 1 + + // Receive latest SourceInfo from the receiver + rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo] + logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector) + addToListOfSources (rxSourceInfo) + curTime = System.currentTimeMillis } } catch { |