aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-27 15:36:58 -0700
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-27 15:36:58 -0700
commite4dc7847ba7ce229c8a30d18579bb283f013879f (patch)
tree478ec72fd7e057a9fc66b80edc0135032dbff753
parent059b6ae4754f61bac457224a03b982ed54a969b1 (diff)
downloadspark-e4dc7847ba7ce229c8a30d18579bb283f013879f.tar.gz
spark-e4dc7847ba7ce229c8a30d18579bb283f013879f.tar.bz2
spark-e4dc7847ba7ce229c8a30d18579bb283f013879f.zip
- Receivers now send back the latest hasBlocksBitVector after every reception
- Will probably settle for the push model. Still not removing the pull related codes - Refactoring...
-rw-r--r--src/scala/spark/Broadcast.scala57
1 files changed, 32 insertions, 25 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index dd4e4cadce..94d0fed8b0 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -272,6 +272,16 @@ extends BroadcastRecipe with Logging {
return localSourceInfo
}
+ // Add new SourceInfo to the listOfSources. Update if it exists already.
+ private def addToListOfSources (newSourceInfo: SourceInfo) = {
+ listOfSources.synchronized {
+ if (listOfSources.contains(newSourceInfo)) {
+ listOfSources = listOfSources - newSourceInfo
+ }
+ listOfSources = listOfSources + newSourceInfo
+ }
+ }
+
class TalkToGuide (gInfo: SourceInfo)
extends Thread with Logging {
override def run = {
@@ -517,21 +527,16 @@ extends BroadcastRecipe with Logging {
oosSource.flush
oisSource =
new ObjectInputStream (peerSocketToSource.getInputStream)
-
+
// Receive latest SourceInfo from peerToTalkTo
var newPeerToTalkTo = oisSource.readObject.asInstanceOf[SourceInfo]
// Update listOfSources
- listOfSources.synchronized {
- if (listOfSources.contains(newPeerToTalkTo)) {
- listOfSources = listOfSources - newPeerToTalkTo
- }
- listOfSources = listOfSources + newPeerToTalkTo
- }
+ addToListOfSources (newPeerToTalkTo)
// Turn the timer OFF, if the sender responds before timeout
timeOutTimer.cancel
- // Send latest SourceInfo
+ // Send the latest SourceInfo
oosSource.writeObject(getLocalSourceInfo)
oosSource.flush
@@ -549,6 +554,10 @@ extends BroadcastRecipe with Logging {
hasBlocks += 1
logInfo ("Received block: " + bcBlock.blockID + " from " + peerToTalkTo)
}
+
+ // Send the latest SourceInfo
+ oosSource.writeObject(getLocalSourceInfo)
+ oosSource.flush
}
} catch {
case e: Exception => {
@@ -566,6 +575,11 @@ extends BroadcastRecipe with Logging {
}
}
+ // Pick the first one
+ private def pickAndRequestBlock = {
+ // TODO: Will implement it later.
+ }
+
private def cleanUpConnections = {
if (oisSource != null) {
oisSource.close
@@ -653,13 +667,8 @@ extends BroadcastRecipe with Logging {
oos.writeObject (selectedSources)
oos.flush
- listOfSources.synchronized {
- // Add this source to the listOfSources
- if (listOfSources.contains(sourceInfo)) {
- listOfSources = listOfSources - sourceInfo
- }
- listOfSources = listOfSources + sourceInfo
- }
+ // Add this source to the listOfSources
+ addToListOfSources (sourceInfo)
} catch {
case e: Exception => {
// Assuming exception caused by receiver failure: remove
@@ -765,17 +774,9 @@ extends BroadcastRecipe with Logging {
oos.flush
// Receive latest SourceInfo from the receiver
- val rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo]
-
+ var rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo]
logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector)
-
- // Update listOfSources
- listOfSources.synchronized {
- if (listOfSources.contains(rxSourceInfo)) {
- listOfSources = listOfSources - rxSourceInfo
- }
- listOfSources = listOfSources + rxSourceInfo
- }
+ addToListOfSources (rxSourceInfo)
// TODO: NOT the most efficient way to do time-based break;
// but using timer can cause a break in the middle :-S
@@ -793,6 +794,12 @@ extends BroadcastRecipe with Logging {
rxSourceInfo.hasBlocksBitVector.set (sentBlock)
}
blocksToSend = blocksToSend - 1
+
+ // Receive latest SourceInfo from the receiver
+ rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo]
+ logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector)
+ addToListOfSources (rxSourceInfo)
+
curTime = System.currentTimeMillis
}
} catch {