diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-22 23:43:11 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-22 23:43:11 -0700 |
commit | 7a7123b5253c238ae72afcc31354615fd7ce58b9 (patch) | |
tree | e92215e8ce6745043bdbcdd80afe512f3f016a57 | |
parent | 27362c6523dad79908171a0b9ab32719ea9fccfd (diff) | |
download | spark-7a7123b5253c238ae72afcc31354615fd7ce58b9.tar.gz spark-7a7123b5253c238ae72afcc31354615fd7ce58b9.tar.bz2 spark-7a7123b5253c238ae72afcc31354615fd7ce58b9.zip |
Running in local Mesos. Multiple things have to be fixed though. Go through the TODOs...
-rw-r--r-- | conf/java-opts | 2 | ||||
-rw-r--r-- | src/scala/spark/Broadcast.scala | 253 | ||||
-rw-r--r-- | src/scala/spark/SparkContext.scala | 5 |
3 files changed, 147 insertions, 113 deletions
diff --git a/conf/java-opts b/conf/java-opts index 16aa97ba9f..c7fa245f47 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=1024 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 +-Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=256 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 1b86d87264..9598005072 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -63,12 +63,7 @@ extends BroadcastRecipe with Logging { @transient var hasCopyInHDFS = false // Must call this after all the variables have been created/initialized - if (!local) { - val start = System.nanoTime - sendBroadcast - val time = (System.nanoTime - start) / 1e9 - logInfo("sendBroadcast took " + time + " s") - } + if (!local) { sendBroadcast } def sendBroadcast () { // Store a persistent copy in HDFS @@ -88,19 +83,14 @@ extends BroadcastRecipe with Logging { totalBlocks = variableInfo.totalBlocks hasBlocks = variableInfo.totalBlocks - // Initialize to all 1 + // Guide has all the blocks hasBlocksBitVector = new BitSet (totalBlocks) hasBlocksBitVector.set (0, totalBlocks) - val masterSource = - SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes) - - listOfSources = listOfSources + masterSource - guideMR = new GuideMultipleRequests guideMR.setDaemon (true) guideMR.start - logInfo ("GuideMultipleRequests started") + logInfo ("GuideMultipleRequests started...") // Must always come AFTER guideMR is created while (guidePort == -1) { @@ -112,7 +102,7 @@ extends BroadcastRecipe with Logging { serveMR = new ServeMultipleRequests serveMR.setDaemon (true) serveMR.start - logInfo ("ServeMultipleRequests started") + logInfo ("ServeMultipleRequests started...") // Must always come AFTER serveMR is created while (listenPort == -1) { @@ -121,6 +111,17 @@ extends BroadcastRecipe with Logging { } } + // Must always come AFTER listenPort is created + val masterSource = + SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes) + hasBlocksBitVector.synchronized { + masterSource.hasBlocksBitVector = hasBlocksBitVector + } + + // In the beginning, this is the only known source to Guide + listOfSources = listOfSources + masterSource + + // Register with the Tracker BroadcastBT.registerValue (uuid, SourceInfo (hostAddress, guidePort, totalBlocks, totalBytes)) } @@ -133,16 +134,16 @@ extends BroadcastRecipe with Logging { if (cachedVal != null) { value_ = cachedVal.asInstanceOf[T] } else { - // Only the first worker in a node can ever be here. - + // Only the first worker in a node can ever be inside this 'else' initializeWorkerVariables + // Start local ServeMultipleRequests thread first serveMR = new ServeMultipleRequests serveMR.setDaemon (true) serveMR.start - logInfo ("ServeMultipleRequests started") + logInfo ("ServeMultipleRequests started...") - val start = System.nanoTime + val start = System.nanoTime val receptionSucceeded = receiveBroadcast (uuid) // If does not succeed, then get from HDFS copy @@ -150,14 +151,15 @@ extends BroadcastRecipe with Logging { value_ = unBlockifyObject[T] BroadcastBT.values.put (uuid, value_) } else { + // TODO: This part won't work, cause HDFS writing is turned OFF val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) value_ = fileIn.readObject.asInstanceOf[T] BroadcastBT.values.put(uuid, value_) fileIn.close - } + } val time = (System.nanoTime - start) / 1e9 - logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s") + logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s") } } } @@ -168,14 +170,19 @@ extends BroadcastRecipe with Logging { hasBlocksBitVector = null totalBytes = -1 totalBlocks = -1 - hasBlocks = 0 + hasBlocks = 0 + listenPortLock = new Object totalBlocksLock = new Object hasBlocksLock = new Object + serveMR = null ttGuide = null + hostAddress = InetAddress.getLocalHost.getHostAddress listenPort = -1 + + listOfSources = ListBuffer[SourceInfo] () } private def blockifyObject (obj: T, blockSize: Int): VariableInfo = { @@ -255,25 +262,27 @@ extends BroadcastRecipe with Logging { extends Thread with Logging { override def run = { // Connect to Guide and send this worker's information - val clientSocketToGuide = - new Socket(gInfo.hostAddress, gInfo.listenPort) - val oosGuide = - new ObjectOutputStream (clientSocketToGuide.getOutputStream) - oosGuide.flush - val oisGuide = - new ObjectInputStream (clientSocketToGuide.getInputStream) - + var clientSocketToGuide: Socket = null + var oosGuide: ObjectOutputStream = null + var oisGuide: ObjectInputStream = null + // TODO: Do we need a breaking mechanism out of this infinite loop? while (true) { + clientSocketToGuide = new Socket(gInfo.hostAddress, gInfo.listenPort) + oosGuide = new ObjectOutputStream (clientSocketToGuide.getOutputStream) + oosGuide.flush + oisGuide = new ObjectInputStream (clientSocketToGuide.getInputStream) + // Send local information oosGuide.writeObject(getLocalSourceInfo) oosGuide.flush - logInfo ("Sent local SourceInfo to Guide") // Receive source information from Guide var suitableSources = oisGuide.readObject.asInstanceOf[ListBuffer[SourceInfo]] + logInfo("Received suitableSources from Master " + suitableSources) + // Update local list of sources by adding or replacing // TODO: There might be some contradiciton on the use of listOfSources listOfSources.synchronized { @@ -285,15 +294,15 @@ extends BroadcastRecipe with Logging { listOfSources = listOfSources + srcInfo } } - + + oisGuide.close + oosGuide.close + clientSocketToGuide.close + // TODO: DO NOT use constant sleep value here // TODO: Guide should send back a backoff time, somehow - Thread.sleep (1234) - } - - oisGuide.close - oosGuide.close - clientSocketToGuide.close + Thread.sleep (1234) + } } } @@ -322,8 +331,7 @@ extends BroadcastRecipe with Logging { oosTracker.flush gInfo = oisTracker.readObject.asInstanceOf[SourceInfo] } catch { - case e: Exception => (gInfo = SourceInfo ("", SourceInfo.TxOverGoToHDFS, - SourceInfo.UnusedParam, SourceInfo.UnusedParam)) + case e: Exception => { } } finally { if (oisTracker != null) { oisTracker.close } if (oosTracker != null) { oosTracker.close } @@ -331,14 +339,22 @@ extends BroadcastRecipe with Logging { } retriesLeft -= 1 // TODO: Should wait before retrying. Implement wait function. - } while (retriesLeft > 0 && gInfo.listenPort < 0) + Thread.sleep (1234) + } while (retriesLeft > 0 && gInfo.listenPort == SourceInfo.TxNotStartedRetry) + logInfo ("Got this guidePort from Tracker: " + gInfo.listenPort) return gInfo } def receiveBroadcast (variableUUID: UUID): Boolean = { - val gInfo = getGuideInfo (variableUUID) - if (gInfo.listenPort == SourceInfo.TxOverGoToHDFS) { return false } + val gInfo = getGuideInfo (variableUUID) + + if (gInfo.listenPort == SourceInfo.TxOverGoToHDFS || + gInfo.listenPort == SourceInfo.TxNotStartedRetry) { + // TODO: SourceInfo.TxNotStartedRetry is not really in use because we go + // to HDFS anyway when receiveBroadcast returns false + return false + } // Wait until hostAddress and listenPort are created by the // ServeMultipleRequests thread @@ -346,7 +362,7 @@ extends BroadcastRecipe with Logging { listenPortLock.synchronized { listenPortLock.wait } - } + } // Setup initial states of variables totalBlocks = gInfo.totalBlocks @@ -361,14 +377,19 @@ extends BroadcastRecipe with Logging { var ttGuide = new TalkToGuide (gInfo) ttGuide.setDaemon (true) ttGuide.start - logInfo ("TalkToGuide started") + logInfo ("TalkToGuide started...") // Start pController to run TalkToPeer threads var pcController = new PeerChatterController pcController.setDaemon (true) pcController.start - logInfo ("PeerChatterController started") - + logInfo ("PeerChatterController started...") + + // TODO: Must fix this. Right now blocking here so that others can finish + while (true) {Thread.sleep(1234)} + + logInfo ("NEVER HERE") + return (hasBlocks == totalBlocks) } @@ -383,13 +404,15 @@ extends BroadcastRecipe with Logging { var threadPool = Executors.newFixedThreadPool(MAX_PEERS).asInstanceOf[ThreadPoolExecutor] - var keepWorking = true - while (keepWorking) { - while (threadPool.getActiveCount != MAX_PEERS) { + while (hasBlocks < totalBlocks) { + val maxPeers = Math.min (listOfSources.size, MAX_PEERS) + while(threadPool.getActiveCount < maxPeers && hasBlocks < totalBlocks) { var peerToTalkTo = pickPeerToTalkTo - if (peerToTalkTo != null) { + if (peerToTalkTo != null) { threadPool.execute (new TalkToPeer (peerToTalkTo)) } + // Sleep for a while before starting some more threads + Thread.sleep(500) } // Sleep for a while before starting some more threads Thread.sleep (500) @@ -401,6 +424,8 @@ extends BroadcastRecipe with Logging { var curPeer: SourceInfo = null var curMax = 0 + logInfo ("Picking peers to talk to...") + // Find peers that are not connected right now var peersNotInUse = ListBuffer[SourceInfo] () listOfSources.synchronized { @@ -412,9 +437,9 @@ extends BroadcastRecipe with Logging { peersNotInUse.foreach { eachSource => var tempHasBlocksBitVector: BitSet = null hasBlocksBitVector.synchronized { - tempHasBlocksBitVector = hasBlocksBitVector + tempHasBlocksBitVector = hasBlocksBitVector.clone.asInstanceOf[BitSet] } - tempHasBlocksBitVector.flip (0, tempHasBlocksBitVector.size) + tempHasBlocksBitVector.flip (0, tempHasBlocksBitVector.size) tempHasBlocksBitVector.and (eachSource.hasBlocksBitVector) if (tempHasBlocksBitVector.cardinality > curMax) { @@ -423,16 +448,23 @@ extends BroadcastRecipe with Logging { } } - return curPeer + if (curPeer != null) + logInfo ("Peer chosen: " + curPeer + " with " + curPeer.hasBlocksBitVector) + else + logInfo ("NO peer chosen...") + + return curPeer } - class TalkToPeer (peerToTalkTo: SourceInfo) + class TalkToPeer (peerToTalkTo: SourceInfo) extends Thread with Logging { override def run = { var peerSocketToSource: Socket = null var oosSource: ObjectOutputStream = null var oisSource: ObjectInputStream = null + logInfo ("TalkToPeer started... => " + peerToTalkTo) + try { // Connect to the source peerSocketToSource = @@ -449,39 +481,39 @@ extends BroadcastRecipe with Logging { } // TODO: Who decides which blocks to move back and forth? - - while (true) { - // Send latest SourceInfo - oosSource.writeObject(getLocalSourceInfo) - oosSource.flush - - // Receive latest SourceInfo from peerToTalkTo - var newPeerToTalkTo = oisSource.readObject.asInstanceOf[SourceInfo] - // Update listOfSources - listOfSources.synchronized { - if (listOfSources.contains(newPeerToTalkTo)) - { listOfSources = listOfSources - newPeerToTalkTo } - listOfSources = listOfSources + newPeerToTalkTo - } + // TODO: Should we transfer multiple instead of just one? + // Send latest SourceInfo + oosSource.writeObject(getLocalSourceInfo) + oosSource.flush + + // Receive latest SourceInfo from peerToTalkTo + var newPeerToTalkTo = oisSource.readObject.asInstanceOf[SourceInfo] + // Update listOfSources + listOfSources.synchronized { + if (listOfSources.contains(newPeerToTalkTo)) + { listOfSources = listOfSources - newPeerToTalkTo } + listOfSources = listOfSources + newPeerToTalkTo + } - val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] - arrayOfBlocks(hasBlocks) = bcBlock - hasBlocksBitVector.synchronized { - hasBlocksBitVector.set (bcBlock.blockID) - } - hasBlocks += 1 - hasBlocksLock.synchronized { - hasBlocksLock.notifyAll - } - logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock) - } + val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] + arrayOfBlocks(hasBlocks) = bcBlock + hasBlocksBitVector.synchronized { + hasBlocksBitVector.set (bcBlock.blockID) + } + hasBlocks += 1 + hasBlocksLock.synchronized { + hasBlocksLock.notifyAll + } + logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock) } catch { case e: Exception => { // TODO: Right now assuming an exception == the other end is dead // Remove this pInfo from listOfSources - listOfSources.synchronized { - listOfSources = listOfSources - peerToTalkTo - } + // TODO: We probably should have the following in some form +// listOfSources.synchronized { +// logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo) +// listOfSources = listOfSources - peerToTalkTo +// } } } finally { if (oisSource != null) { oisSource.close } @@ -506,7 +538,7 @@ extends BroadcastRecipe with Logging { serverSocket = new ServerSocket (0) guidePort = serverSocket.getLocalPort - logInfo ("GuideMultipleRequests" + serverSocket + " " + guidePort) + logInfo ("GuideMultipleRequests => " + serverSocket + " " + guidePort) guidePortLock.synchronized { guidePortLock.notifyAll @@ -559,14 +591,16 @@ extends BroadcastRecipe with Logging { // Connecting worker is sending in its information sourceInfo = ois.readObject.asInstanceOf[SourceInfo] - listOfSources.synchronized { - // Select a suitable source and send it back to the worker - selectedSources = selectSuitableSources (sourceInfo) - logInfo ("Sending selectedSources:" + selectedSources) - oos.writeObject (selectedSources) - oos.flush + // Select a suitable source and send it back to the worker + selectedSources = selectSuitableSources (sourceInfo) + logInfo ("Sending selectedSources:" + selectedSources) + oos.writeObject (selectedSources) + oos.flush + listOfSources.synchronized { // Add this source to the listOfSources + if (listOfSources.contains(sourceInfo)) + { listOfSources = listOfSources - sourceInfo } listOfSources = listOfSources + sourceInfo } } catch { @@ -613,7 +647,7 @@ extends BroadcastRecipe with Logging { serverSocket = new ServerSocket (0) listenPort = serverSocket.getLocalPort - logInfo ("ServeMultipleRequests" + serverSocket + " " + listenPort) + logInfo ("ServeMultipleRequests " + serverSocket) listenPortLock.synchronized { listenPortLock.notifyAll @@ -653,29 +687,28 @@ extends BroadcastRecipe with Logging { oos.flush private val ois = new ObjectInputStream (clientSocket.getInputStream) - var keepServing = true + logInfo ("new ServeSingleRequest is running") override def run = { try { - logInfo ("new ServeSingleRequest is running") + // Receive latest SourceInfo from the receiver + var rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo] - do { - // Receive latest SourceInfo from the receiver - var rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo] - // Update listOfSources - listOfSources.synchronized { - if (listOfSources.contains(rxSourceInfo)) - { listOfSources = listOfSources - rxSourceInfo } - listOfSources = listOfSources + rxSourceInfo - } - - // Send latest local SourceInfo to the receiver - oos.writeObject(getLocalSourceInfo) - oos.flush - - keepServing = pickAndSendBlock (rxSourceInfo.hasBlocksBitVector) - // TODO: Perhaps we shouldn't close connection after only one try - } while (keepServing) + logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector) + + // Update listOfSources + listOfSources.synchronized { + if (listOfSources.contains(rxSourceInfo)) + { listOfSources = listOfSources - rxSourceInfo } + listOfSources = listOfSources + rxSourceInfo + } + + // Send latest local SourceInfo to the receiver + oos.writeObject(getLocalSourceInfo) + oos.flush + + // TODO: Right now sending a single block. Should reuse connection. + pickAndSendBlock (rxSourceInfo.hasBlocksBitVector) } catch { // TODO: Need to add better exception handling here // If something went wrong, e.g., the worker at the other end died etc. @@ -693,6 +726,7 @@ extends BroadcastRecipe with Logging { // TODO: Right now picking the first block that matches private def pickAndSendBlock (rxHasBlocksBitVector: BitSet): Boolean = { + logInfo ("Picking a block to send...") // Figure out which blocks to send rxHasBlocksBitVector.flip (0, rxHasBlocksBitVector.size) hasBlocksBitVector.synchronized { @@ -761,7 +795,6 @@ case class SourceInfo (val hostAddress: String, val listenPort: Int, var receptionFailed = false var MBps: Double = BroadcastBT.MaxMBps - assert (totalBlocks > 0) var hasBlocksBitVector: BitSet = new BitSet (totalBlocks) } diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala index 216ea4c0a9..3e9aeb506d 100644 --- a/src/scala/spark/SparkContext.scala +++ b/src/scala/spark/SparkContext.scala @@ -20,8 +20,9 @@ class SparkContext(master: String, frameworkName: String) extends Logging { // TODO: Keep around a weak hash map of values to Cached versions? // def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local) - def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local) - + // def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local) + def broadcast[T](value: T) = new BitTorrentBroadcast(value, local) + def textFile(path: String) = new HdfsTextFile(this, path) val LOCAL_REGEX = """local\[([0-9]+)\]""".r |