aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-22 23:43:11 -0700
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-22 23:43:11 -0700
commit7a7123b5253c238ae72afcc31354615fd7ce58b9 (patch)
treee92215e8ce6745043bdbcdd80afe512f3f016a57
parent27362c6523dad79908171a0b9ab32719ea9fccfd (diff)
downloadspark-7a7123b5253c238ae72afcc31354615fd7ce58b9.tar.gz
spark-7a7123b5253c238ae72afcc31354615fd7ce58b9.tar.bz2
spark-7a7123b5253c238ae72afcc31354615fd7ce58b9.zip
Running in local Mesos. Multiple things have to be fixed though. Go through the TODOs...
-rw-r--r--conf/java-opts2
-rw-r--r--src/scala/spark/Broadcast.scala253
-rw-r--r--src/scala/spark/SparkContext.scala5
3 files changed, 147 insertions, 113 deletions
diff --git a/conf/java-opts b/conf/java-opts
index 16aa97ba9f..c7fa245f47 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1 +1 @@
--Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=1024 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000
+-Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=256 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 1b86d87264..9598005072 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -63,12 +63,7 @@ extends BroadcastRecipe with Logging {
@transient var hasCopyInHDFS = false
// Must call this after all the variables have been created/initialized
- if (!local) {
- val start = System.nanoTime
- sendBroadcast
- val time = (System.nanoTime - start) / 1e9
- logInfo("sendBroadcast took " + time + " s")
- }
+ if (!local) { sendBroadcast }
def sendBroadcast () {
// Store a persistent copy in HDFS
@@ -88,19 +83,14 @@ extends BroadcastRecipe with Logging {
totalBlocks = variableInfo.totalBlocks
hasBlocks = variableInfo.totalBlocks
- // Initialize to all 1
+ // Guide has all the blocks
hasBlocksBitVector = new BitSet (totalBlocks)
hasBlocksBitVector.set (0, totalBlocks)
- val masterSource =
- SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes)
-
- listOfSources = listOfSources + masterSource
-
guideMR = new GuideMultipleRequests
guideMR.setDaemon (true)
guideMR.start
- logInfo ("GuideMultipleRequests started")
+ logInfo ("GuideMultipleRequests started...")
// Must always come AFTER guideMR is created
while (guidePort == -1) {
@@ -112,7 +102,7 @@ extends BroadcastRecipe with Logging {
serveMR = new ServeMultipleRequests
serveMR.setDaemon (true)
serveMR.start
- logInfo ("ServeMultipleRequests started")
+ logInfo ("ServeMultipleRequests started...")
// Must always come AFTER serveMR is created
while (listenPort == -1) {
@@ -121,6 +111,17 @@ extends BroadcastRecipe with Logging {
}
}
+ // Must always come AFTER listenPort is created
+ val masterSource =
+ SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes)
+ hasBlocksBitVector.synchronized {
+ masterSource.hasBlocksBitVector = hasBlocksBitVector
+ }
+
+ // In the beginning, this is the only known source to Guide
+ listOfSources = listOfSources + masterSource
+
+ // Register with the Tracker
BroadcastBT.registerValue (uuid,
SourceInfo (hostAddress, guidePort, totalBlocks, totalBytes))
}
@@ -133,16 +134,16 @@ extends BroadcastRecipe with Logging {
if (cachedVal != null) {
value_ = cachedVal.asInstanceOf[T]
} else {
- // Only the first worker in a node can ever be here.
-
+ // Only the first worker in a node can ever be inside this 'else'
initializeWorkerVariables
+ // Start local ServeMultipleRequests thread first
serveMR = new ServeMultipleRequests
serveMR.setDaemon (true)
serveMR.start
- logInfo ("ServeMultipleRequests started")
+ logInfo ("ServeMultipleRequests started...")
- val start = System.nanoTime
+ val start = System.nanoTime
val receptionSucceeded = receiveBroadcast (uuid)
// If does not succeed, then get from HDFS copy
@@ -150,14 +151,15 @@ extends BroadcastRecipe with Logging {
value_ = unBlockifyObject[T]
BroadcastBT.values.put (uuid, value_)
} else {
+ // TODO: This part won't work, cause HDFS writing is turned OFF
val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
value_ = fileIn.readObject.asInstanceOf[T]
BroadcastBT.values.put(uuid, value_)
fileIn.close
- }
+ }
val time = (System.nanoTime - start) / 1e9
- logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
+ logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
}
}
}
@@ -168,14 +170,19 @@ extends BroadcastRecipe with Logging {
hasBlocksBitVector = null
totalBytes = -1
totalBlocks = -1
- hasBlocks = 0
+ hasBlocks = 0
+
listenPortLock = new Object
totalBlocksLock = new Object
hasBlocksLock = new Object
+
serveMR = null
ttGuide = null
+
hostAddress = InetAddress.getLocalHost.getHostAddress
listenPort = -1
+
+ listOfSources = ListBuffer[SourceInfo] ()
}
private def blockifyObject (obj: T, blockSize: Int): VariableInfo = {
@@ -255,25 +262,27 @@ extends BroadcastRecipe with Logging {
extends Thread with Logging {
override def run = {
// Connect to Guide and send this worker's information
- val clientSocketToGuide =
- new Socket(gInfo.hostAddress, gInfo.listenPort)
- val oosGuide =
- new ObjectOutputStream (clientSocketToGuide.getOutputStream)
- oosGuide.flush
- val oisGuide =
- new ObjectInputStream (clientSocketToGuide.getInputStream)
-
+ var clientSocketToGuide: Socket = null
+ var oosGuide: ObjectOutputStream = null
+ var oisGuide: ObjectInputStream = null
+
// TODO: Do we need a breaking mechanism out of this infinite loop?
while (true) {
+ clientSocketToGuide = new Socket(gInfo.hostAddress, gInfo.listenPort)
+ oosGuide = new ObjectOutputStream (clientSocketToGuide.getOutputStream)
+ oosGuide.flush
+ oisGuide = new ObjectInputStream (clientSocketToGuide.getInputStream)
+
// Send local information
oosGuide.writeObject(getLocalSourceInfo)
oosGuide.flush
- logInfo ("Sent local SourceInfo to Guide")
// Receive source information from Guide
var suitableSources =
oisGuide.readObject.asInstanceOf[ListBuffer[SourceInfo]]
+ logInfo("Received suitableSources from Master " + suitableSources)
+
// Update local list of sources by adding or replacing
// TODO: There might be some contradiciton on the use of listOfSources
listOfSources.synchronized {
@@ -285,15 +294,15 @@ extends BroadcastRecipe with Logging {
listOfSources = listOfSources + srcInfo
}
}
-
+
+ oisGuide.close
+ oosGuide.close
+ clientSocketToGuide.close
+
// TODO: DO NOT use constant sleep value here
// TODO: Guide should send back a backoff time, somehow
- Thread.sleep (1234)
- }
-
- oisGuide.close
- oosGuide.close
- clientSocketToGuide.close
+ Thread.sleep (1234)
+ }
}
}
@@ -322,8 +331,7 @@ extends BroadcastRecipe with Logging {
oosTracker.flush
gInfo = oisTracker.readObject.asInstanceOf[SourceInfo]
} catch {
- case e: Exception => (gInfo = SourceInfo ("", SourceInfo.TxOverGoToHDFS,
- SourceInfo.UnusedParam, SourceInfo.UnusedParam))
+ case e: Exception => { }
} finally {
if (oisTracker != null) { oisTracker.close }
if (oosTracker != null) { oosTracker.close }
@@ -331,14 +339,22 @@ extends BroadcastRecipe with Logging {
}
retriesLeft -= 1
// TODO: Should wait before retrying. Implement wait function.
- } while (retriesLeft > 0 && gInfo.listenPort < 0)
+ Thread.sleep (1234)
+ } while (retriesLeft > 0 && gInfo.listenPort == SourceInfo.TxNotStartedRetry)
+
logInfo ("Got this guidePort from Tracker: " + gInfo.listenPort)
return gInfo
}
def receiveBroadcast (variableUUID: UUID): Boolean = {
- val gInfo = getGuideInfo (variableUUID)
- if (gInfo.listenPort == SourceInfo.TxOverGoToHDFS) { return false }
+ val gInfo = getGuideInfo (variableUUID)
+
+ if (gInfo.listenPort == SourceInfo.TxOverGoToHDFS ||
+ gInfo.listenPort == SourceInfo.TxNotStartedRetry) {
+ // TODO: SourceInfo.TxNotStartedRetry is not really in use because we go
+ // to HDFS anyway when receiveBroadcast returns false
+ return false
+ }
// Wait until hostAddress and listenPort are created by the
// ServeMultipleRequests thread
@@ -346,7 +362,7 @@ extends BroadcastRecipe with Logging {
listenPortLock.synchronized {
listenPortLock.wait
}
- }
+ }
// Setup initial states of variables
totalBlocks = gInfo.totalBlocks
@@ -361,14 +377,19 @@ extends BroadcastRecipe with Logging {
var ttGuide = new TalkToGuide (gInfo)
ttGuide.setDaemon (true)
ttGuide.start
- logInfo ("TalkToGuide started")
+ logInfo ("TalkToGuide started...")
// Start pController to run TalkToPeer threads
var pcController = new PeerChatterController
pcController.setDaemon (true)
pcController.start
- logInfo ("PeerChatterController started")
-
+ logInfo ("PeerChatterController started...")
+
+ // TODO: Must fix this. Right now blocking here so that others can finish
+ while (true) {Thread.sleep(1234)}
+
+ logInfo ("NEVER HERE")
+
return (hasBlocks == totalBlocks)
}
@@ -383,13 +404,15 @@ extends BroadcastRecipe with Logging {
var threadPool =
Executors.newFixedThreadPool(MAX_PEERS).asInstanceOf[ThreadPoolExecutor]
- var keepWorking = true
- while (keepWorking) {
- while (threadPool.getActiveCount != MAX_PEERS) {
+ while (hasBlocks < totalBlocks) {
+ val maxPeers = Math.min (listOfSources.size, MAX_PEERS)
+ while(threadPool.getActiveCount < maxPeers && hasBlocks < totalBlocks) {
var peerToTalkTo = pickPeerToTalkTo
- if (peerToTalkTo != null) {
+ if (peerToTalkTo != null) {
threadPool.execute (new TalkToPeer (peerToTalkTo))
}
+ // Sleep for a while before starting some more threads
+ Thread.sleep(500)
}
// Sleep for a while before starting some more threads
Thread.sleep (500)
@@ -401,6 +424,8 @@ extends BroadcastRecipe with Logging {
var curPeer: SourceInfo = null
var curMax = 0
+ logInfo ("Picking peers to talk to...")
+
// Find peers that are not connected right now
var peersNotInUse = ListBuffer[SourceInfo] ()
listOfSources.synchronized {
@@ -412,9 +437,9 @@ extends BroadcastRecipe with Logging {
peersNotInUse.foreach { eachSource =>
var tempHasBlocksBitVector: BitSet = null
hasBlocksBitVector.synchronized {
- tempHasBlocksBitVector = hasBlocksBitVector
+ tempHasBlocksBitVector = hasBlocksBitVector.clone.asInstanceOf[BitSet]
}
- tempHasBlocksBitVector.flip (0, tempHasBlocksBitVector.size)
+ tempHasBlocksBitVector.flip (0, tempHasBlocksBitVector.size)
tempHasBlocksBitVector.and (eachSource.hasBlocksBitVector)
if (tempHasBlocksBitVector.cardinality > curMax) {
@@ -423,16 +448,23 @@ extends BroadcastRecipe with Logging {
}
}
- return curPeer
+ if (curPeer != null)
+ logInfo ("Peer chosen: " + curPeer + " with " + curPeer.hasBlocksBitVector)
+ else
+ logInfo ("NO peer chosen...")
+
+ return curPeer
}
- class TalkToPeer (peerToTalkTo: SourceInfo)
+ class TalkToPeer (peerToTalkTo: SourceInfo)
extends Thread with Logging {
override def run = {
var peerSocketToSource: Socket = null
var oosSource: ObjectOutputStream = null
var oisSource: ObjectInputStream = null
+ logInfo ("TalkToPeer started... => " + peerToTalkTo)
+
try {
// Connect to the source
peerSocketToSource =
@@ -449,39 +481,39 @@ extends BroadcastRecipe with Logging {
}
// TODO: Who decides which blocks to move back and forth?
-
- while (true) {
- // Send latest SourceInfo
- oosSource.writeObject(getLocalSourceInfo)
- oosSource.flush
-
- // Receive latest SourceInfo from peerToTalkTo
- var newPeerToTalkTo = oisSource.readObject.asInstanceOf[SourceInfo]
- // Update listOfSources
- listOfSources.synchronized {
- if (listOfSources.contains(newPeerToTalkTo))
- { listOfSources = listOfSources - newPeerToTalkTo }
- listOfSources = listOfSources + newPeerToTalkTo
- }
+ // TODO: Should we transfer multiple instead of just one?
+ // Send latest SourceInfo
+ oosSource.writeObject(getLocalSourceInfo)
+ oosSource.flush
+
+ // Receive latest SourceInfo from peerToTalkTo
+ var newPeerToTalkTo = oisSource.readObject.asInstanceOf[SourceInfo]
+ // Update listOfSources
+ listOfSources.synchronized {
+ if (listOfSources.contains(newPeerToTalkTo))
+ { listOfSources = listOfSources - newPeerToTalkTo }
+ listOfSources = listOfSources + newPeerToTalkTo
+ }
- val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
- arrayOfBlocks(hasBlocks) = bcBlock
- hasBlocksBitVector.synchronized {
- hasBlocksBitVector.set (bcBlock.blockID)
- }
- hasBlocks += 1
- hasBlocksLock.synchronized {
- hasBlocksLock.notifyAll
- }
- logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock)
- }
+ val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
+ arrayOfBlocks(hasBlocks) = bcBlock
+ hasBlocksBitVector.synchronized {
+ hasBlocksBitVector.set (bcBlock.blockID)
+ }
+ hasBlocks += 1
+ hasBlocksLock.synchronized {
+ hasBlocksLock.notifyAll
+ }
+ logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock)
} catch {
case e: Exception => {
// TODO: Right now assuming an exception == the other end is dead
// Remove this pInfo from listOfSources
- listOfSources.synchronized {
- listOfSources = listOfSources - peerToTalkTo
- }
+ // TODO: We probably should have the following in some form
+// listOfSources.synchronized {
+// logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo)
+// listOfSources = listOfSources - peerToTalkTo
+// }
}
} finally {
if (oisSource != null) { oisSource.close }
@@ -506,7 +538,7 @@ extends BroadcastRecipe with Logging {
serverSocket = new ServerSocket (0)
guidePort = serverSocket.getLocalPort
- logInfo ("GuideMultipleRequests" + serverSocket + " " + guidePort)
+ logInfo ("GuideMultipleRequests => " + serverSocket + " " + guidePort)
guidePortLock.synchronized {
guidePortLock.notifyAll
@@ -559,14 +591,16 @@ extends BroadcastRecipe with Logging {
// Connecting worker is sending in its information
sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
- listOfSources.synchronized {
- // Select a suitable source and send it back to the worker
- selectedSources = selectSuitableSources (sourceInfo)
- logInfo ("Sending selectedSources:" + selectedSources)
- oos.writeObject (selectedSources)
- oos.flush
+ // Select a suitable source and send it back to the worker
+ selectedSources = selectSuitableSources (sourceInfo)
+ logInfo ("Sending selectedSources:" + selectedSources)
+ oos.writeObject (selectedSources)
+ oos.flush
+ listOfSources.synchronized {
// Add this source to the listOfSources
+ if (listOfSources.contains(sourceInfo))
+ { listOfSources = listOfSources - sourceInfo }
listOfSources = listOfSources + sourceInfo
}
} catch {
@@ -613,7 +647,7 @@ extends BroadcastRecipe with Logging {
serverSocket = new ServerSocket (0)
listenPort = serverSocket.getLocalPort
- logInfo ("ServeMultipleRequests" + serverSocket + " " + listenPort)
+ logInfo ("ServeMultipleRequests " + serverSocket)
listenPortLock.synchronized {
listenPortLock.notifyAll
@@ -653,29 +687,28 @@ extends BroadcastRecipe with Logging {
oos.flush
private val ois = new ObjectInputStream (clientSocket.getInputStream)
- var keepServing = true
+ logInfo ("new ServeSingleRequest is running")
override def run = {
try {
- logInfo ("new ServeSingleRequest is running")
+ // Receive latest SourceInfo from the receiver
+ var rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo]
- do {
- // Receive latest SourceInfo from the receiver
- var rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo]
- // Update listOfSources
- listOfSources.synchronized {
- if (listOfSources.contains(rxSourceInfo))
- { listOfSources = listOfSources - rxSourceInfo }
- listOfSources = listOfSources + rxSourceInfo
- }
-
- // Send latest local SourceInfo to the receiver
- oos.writeObject(getLocalSourceInfo)
- oos.flush
-
- keepServing = pickAndSendBlock (rxSourceInfo.hasBlocksBitVector)
- // TODO: Perhaps we shouldn't close connection after only one try
- } while (keepServing)
+ logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector)
+
+ // Update listOfSources
+ listOfSources.synchronized {
+ if (listOfSources.contains(rxSourceInfo))
+ { listOfSources = listOfSources - rxSourceInfo }
+ listOfSources = listOfSources + rxSourceInfo
+ }
+
+ // Send latest local SourceInfo to the receiver
+ oos.writeObject(getLocalSourceInfo)
+ oos.flush
+
+ // TODO: Right now sending a single block. Should reuse connection.
+ pickAndSendBlock (rxSourceInfo.hasBlocksBitVector)
} catch {
// TODO: Need to add better exception handling here
// If something went wrong, e.g., the worker at the other end died etc.
@@ -693,6 +726,7 @@ extends BroadcastRecipe with Logging {
// TODO: Right now picking the first block that matches
private def pickAndSendBlock (rxHasBlocksBitVector: BitSet): Boolean = {
+ logInfo ("Picking a block to send...")
// Figure out which blocks to send
rxHasBlocksBitVector.flip (0, rxHasBlocksBitVector.size)
hasBlocksBitVector.synchronized {
@@ -761,7 +795,6 @@ case class SourceInfo (val hostAddress: String, val listenPort: Int,
var receptionFailed = false
var MBps: Double = BroadcastBT.MaxMBps
- assert (totalBlocks > 0)
var hasBlocksBitVector: BitSet = new BitSet (totalBlocks)
}
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index 216ea4c0a9..3e9aeb506d 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -20,8 +20,9 @@ class SparkContext(master: String, frameworkName: String) extends Logging {
// TODO: Keep around a weak hash map of values to Cached versions?
// def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local)
- def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local)
-
+ // def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local)
+ def broadcast[T](value: T) = new BitTorrentBroadcast(value, local)
+
def textFile(path: String) = new HdfsTextFile(this, path)
val LOCAL_REGEX = """local\[([0-9]+)\]""".r