aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-20 17:27:02 -0700
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-20 17:27:02 -0700
commit53bd64afe750711c8033117b31dc812062a8d686 (patch)
tree3f5b860dfc0bd4a0a891baa5b0fb6e8633677d91
parent05bca235a734105f3bc6d55a5ba8a51928fe2302 (diff)
downloadspark-53bd64afe750711c8033117b31dc812062a8d686.tar.gz
spark-53bd64afe750711c8033117b31dc812062a8d686.tar.bz2
spark-53bd64afe750711c8033117b31dc812062a8d686.zip
- Added TalkToPeer class which will allow peers to communicate between them.
- Still need the controller class that will decide which peers to communicate with
-rw-r--r--src/scala/spark/Broadcast.scala153
1 files changed, 79 insertions, 74 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 5666ba999d..b36d813fdc 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -334,64 +334,65 @@ class BitTorrentBroadcast[T] (@transient var value_ : T, local: Boolean)
logInfo ("TalkToGuide started")
// TODO:
+
+
return (hasBlocks == totalBlocks)
}
- // Tries to receive broadcast from the source and returns Boolean status.
- // This might be called multiple times to retry a defined number of times.
- private def receiveSingleTransmission(sourceInfo: SourceInfo): Boolean = {
- var clientSocketToSource: Socket = null
- var oosSource: ObjectOutputStream = null
- var oisSource: ObjectInputStream = null
-
- var receptionSucceeded = false
- try {
- // Connect to the source to get the object itself
- clientSocketToSource =
- new Socket (sourceInfo.hostAddress, sourceInfo.listenPort)
- oosSource =
- new ObjectOutputStream (clientSocketToSource.getOutputStream)
- oosSource.flush
- oisSource =
- new ObjectInputStream (clientSocketToSource.getInputStream)
-
- logInfo ("Inside receiveSingleTransmission")
- logInfo ("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
-
- // Send hasBlocksBitVector
- oosSource.writeObject(hasBlocksBitVector)
- oosSource.flush
-
- // Send the range
- oosSource.writeObject((hasBlocks, totalBlocks))
- oosSource.flush
+ class TalkToPeer (sourceInfo: SourceInfo)
+ extends Thread with Logging {
+ override def run = {
+ var peerSocketToSource: Socket = null
+ var oosSource: ObjectOutputStream = null
+ var oisSource: ObjectInputStream = null
- for (i <- hasBlocks until totalBlocks) {
- val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
- arrayOfBlocks(hasBlocks) = bcBlock
- hasBlocksBitVector.set (bcBlock.blockID)
- hasBlocks += 1
- // Set to true if at least one block is received
- receptionSucceeded = true
- hasBlocksLock.synchronized {
- hasBlocksLock.notifyAll
+ try {
+ // Connect to the source
+ peerSocketToSource =
+ new Socket (sourceInfo.hostAddress, sourceInfo.listenPort)
+ oosSource =
+ new ObjectOutputStream (peerSocketToSource.getOutputStream)
+ oosSource.flush
+ oisSource =
+ new ObjectInputStream (peerSocketToSource.getInputStream)
+
+ // TODO: Who decides which blocks to move back and forth?
+ // TODO: Letting the source decide for now
+
+ while (true) {
+ // Send hasBlocksBitVector
+ oosSource.writeObject(hasBlocksBitVector)
+ oosSource.flush
+
+ // Receive hasBlocksBitVector
+ // TODO: Need to update this information in the listOfSources
+ var txHasBlocksBitVector = oisSource.readObject.asInstanceOf[BitSet]
+
+ val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
+ arrayOfBlocks(hasBlocks) = bcBlock
+ hasBlocksBitVector.set (bcBlock.blockID)
+ hasBlocks += 1
+ hasBlocksLock.synchronized {
+ hasBlocksLock.notifyAll
+ }
+ logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock)
+ }
+ } catch {
+ case e: Exception => {
+ // TODO: Right now assuming an exception == the other end is dead
+ // Remove this pInfo from listOfSources
+ listOfSources.synchronized {
+ listOfSources = listOfSources - sourceInfo
+ }
}
- logInfo ("Received block: " + i + " " + bcBlock)
- }
- logInfo ("After the receive loop")
- } catch {
- case e: Exception => {
- logInfo ("receiveSingleTransmission had a " + e)
+ } finally {
+ if (oisSource != null) { oisSource.close }
+ if (oosSource != null) { oosSource.close }
+ if (peerSocketToSource != null) { peerSocketToSource.close }
}
- } finally {
- if (oisSource != null) { oisSource.close }
- if (oosSource != null) { oosSource.close }
- if (clientSocketToSource != null) { clientSocketToSource.close }
}
-
- return receptionSucceeded
- }
+ }
class GuideMultipleRequests extends Thread with Logging {
override def run = {
@@ -547,16 +548,24 @@ class BitTorrentBroadcast[T] (@transient var value_ : T, local: Boolean)
oos.flush
private val ois = new ObjectInputStream (clientSocket.getInputStream)
+ var keepServing = true
+
def run = {
try {
logInfo ("new ServeSingleRequest is running")
+ do {
// Receive hasBlocksBitVector from the receiver
var rxHasBlocksBitVector = ois.readObject.asInstanceOf[BitSet]
+ // TODO: Update this info in listOfSources
- // Receive range to send
- var sendRange = ois.readObject.asInstanceOf[(Int, Int)]
- sendObject (sendRange._1, sendRange._2)
+ // Send hasBlocksBitVector to the receiver
+ oos.writeObject(hasBlocksBitVector)
+ oos.flush
+
+ keepServing = pickAndSendBlock (rxHasBlocksBitVector)
+ // TODO: Perhaps we shouldn't close connection after only one try
+ } while (keepServing)
} catch {
// TODO: Need to add better exception handling here
// If something went wrong, e.g., the worker at the other end died etc.
@@ -572,28 +581,24 @@ class BitTorrentBroadcast[T] (@transient var value_ : T, local: Boolean)
}
}
- private def sendObject (sendFrom: Int, sendUntil: Int) = {
- // Wait till receiving the SourceInfo from Master
- while (totalBlocks == -1) {
- totalBlocksLock.synchronized {
- totalBlocksLock.wait
- }
- }
-
- for (i <- sendFrom until sendUntil) {
- while (i == hasBlocks) {
- hasBlocksLock.synchronized {
- hasBlocksLock.wait
- }
- }
- try {
- oos.writeObject (arrayOfBlocks(i))
- oos.flush
- } catch {
- case e: Exception => { }
- }
- logInfo ("Send block: " + i + " " + arrayOfBlocks(i))
+ // TODO: Right now picking the first block that matches
+ private def pickAndSendBlock (rxHasBlocksBitVector: BitSet): Boolean = {
+ // Figure out which blocks to send
+ rxHasBlocksBitVector.flip (0, rxHasBlocksBitVector.size)
+ hasBlocksBitVector.synchronized {
+ rxHasBlocksBitVector.and (hasBlocksBitVector)
}
+
+ var nextIndex = rxHasBlocksBitVector.nextSetBit (0)
+
+ if (nextIndex == -1) { return false }
+
+ try {
+ oos.writeObject (arrayOfBlocks(nextIndex))
+ oos.flush
+ } catch { case e: Exception => { } }
+ logInfo ("Sent block: " + nextIndex + " " + arrayOfBlocks(nextIndex))
+ return true
}
}
}