From cbb29fae1a9b8678ffeab3521901d61170a24dd2 Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury Date: Sat, 6 Nov 2010 16:52:50 -0700 Subject: Updated log outputs for consistency with BT branches. Code formatting. --- conf/java-opts | 2 +- conf/log4j.properties | 2 +- src/scala/spark/Broadcast.scala | 339 ++++++++++++++++++++++++---------------- 3 files changed, 209 insertions(+), 134 deletions(-) diff --git a/conf/java-opts b/conf/java-opts index b61e8163b5..0307361a26 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=1024 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false +-Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=256 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000 -Dspark.broadcast.dualMode=false diff --git a/conf/log4j.properties b/conf/log4j.properties index d72dbadc39..33774b463d 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -2,7 +2,7 @@ log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.eclipse.jetty=WARN diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 0e29c8aa43..b5bcdde21b 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -2,11 +2,11 @@ package spark import java.io._ import java.net._ -import java.util.{UUID, PriorityQueue, Comparator} +import java.util.{Comparator, PriorityQueue, UUID} import com.google.common.collect.MapMaker -import java.util.concurrent.{Executors, ExecutorService} +import java.util.concurrent.{Executors, ExecutorService, ThreadPoolExecutor} import scala.collection.mutable.Map @@ -39,11 +39,13 @@ trait BroadcastRecipe { @serializable class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) - extends BroadcastRecipe with Logging { +extends BroadcastRecipe with Logging { def value = value_ - BroadcastCS.synchronized { BroadcastCS.values.put (uuid, value_) } + BroadcastCS.synchronized { + BroadcastCS.values.put (uuid, value_) + } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @transient var totalBytes = -1 @@ -68,10 +70,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // Must call this after all the variables have been created/initialized if (!local) { - val start = System.nanoTime sendBroadcast - val time = (System.nanoTime - start) / 1e9 - logInfo("sendBroadcast took " + time + " s") } def sendBroadcast () { @@ -80,20 +79,21 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) // out.writeObject (value_) // out.close + // TODO: Fix this at some point hasCopyInHDFS = true // Create a variableInfo object and store it in valueInfos - var variableInfo = blockifyObject (value_, BroadcastCS.blockSize) + var variableInfo = blockifyObject (value_, BroadcastCS.BlockSize) guideMR = new GuideMultipleRequests guideMR.setDaemon (true) guideMR.start - logInfo (System.currentTimeMillis + ": " + "GuideMultipleRequests started") + logInfo ("GuideMultipleRequests started...") serveMR = new ServeMultipleRequests serveMR.setDaemon (true) serveMR.start - logInfo (System.currentTimeMillis + ": " + "ServeMultipleRequests started") + logInfo ("ServeMultipleRequests started...") // Prepare the value being broadcasted // TODO: Refactoring and clean-up required here @@ -113,7 +113,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 0) pqOfSources.add (masterSource_0) // Add one more time to have two replicas of any seeds in the PQ - if (BroadcastCS.dualMode) { + if (BroadcastCS.DualMode) { val masterSource_1 = new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 1) pqOfSources.add (masterSource_1) @@ -144,7 +144,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) serveMR = new ServeMultipleRequests serveMR.setDaemon (true) serveMR.start - logInfo (System.currentTimeMillis + ": " + "ServeMultipleRequests started") + logInfo ("ServeMultipleRequests started...") val start = System.nanoTime @@ -161,7 +161,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } val time = (System.nanoTime - start) / 1e9 - logInfo( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s") + logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s") } } } @@ -215,7 +215,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) var retByteArray = new Array[Byte] (totalBytes) for (i <- 0 until totalBlocks) { System.arraycopy (arrayOfBlocks(i).byteArray, 0, retByteArray, - i * BroadcastCS.blockSize, arrayOfBlocks(i).byteArray.length) + i * BroadcastCS.BlockSize, arrayOfBlocks(i).byteArray.length) } byteArrayToObject (retByteArray) } @@ -238,12 +238,12 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) var masterListenPort: Int = -1 - var retriesLeft = BroadcastCS.maxRetryCount + var retriesLeft = BroadcastCS.MaxRetryCount do { try { // Connect to the tracker to find out the guide val clientSocketToTracker = - new Socket(BroadcastCS.masterHostAddress, BroadcastCS.masterTrackerPort) + new Socket(BroadcastCS.MasterHostAddress, BroadcastCS.MasterTrackerPort) val oosTracker = new ObjectOutputStream (clientSocketToTracker.getOutputStream) oosTracker.flush @@ -258,14 +258,20 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // In case of any failure, set masterListenPort = 0 to read from HDFS case e: Exception => (masterListenPort = 0) } finally { - if (oisTracker != null) { oisTracker.close } - if (oosTracker != null) { oosTracker.close } - if (clientSocketToTracker != null) { clientSocketToTracker.close } + if (oisTracker != null) { + oisTracker.close + } + if (oosTracker != null) { + oosTracker.close + } + if (clientSocketToTracker != null) { + clientSocketToTracker.close + } } retriesLeft -= 1 // TODO: Should wait before retrying } while (retriesLeft > 0 && masterListenPort < 0) - logInfo (System.currentTimeMillis + ": " + "Got this guidePort from Tracker: " + masterListenPort) + logInfo ("Got this guidePort from Tracker: " + masterListenPort) return masterListenPort } @@ -273,7 +279,9 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // Get masterListenPort for this variable from the Tracker val masterListenPort = getMasterListenPort (variableUUID) // If Tracker says that there is no guide for this object, read from HDFS - if (masterListenPort == 0) { return false } + if (masterListenPort == 0) { + return false + } // Wait until hostAddress and listenPort are created by the // ServeMultipleRequests thread @@ -285,12 +293,12 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // Connect and receive broadcast from the specified source, retrying the // specified number of times in case of failures - var retriesLeft = BroadcastCS.maxRetryCount + var retriesLeft = BroadcastCS.MaxRetryCount do { // Connect to Master and send this worker's Information val clientSocketToMaster = - new Socket(BroadcastCS.masterHostAddress, masterListenPort) - logInfo (System.currentTimeMillis + ": " + "Connected to Master's guiding object") + new Socket(BroadcastCS.MasterHostAddress, masterListenPort) + logInfo ("Connected to Master's guiding object") // TODO: Guiding object connection is reusable val oosMaster = new ObjectOutputStream (clientSocketToMaster.getOutputStream) @@ -310,14 +318,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } totalBytes = sourceInfo.totalBytes - logInfo (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) + logInfo ("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) val start = System.nanoTime val receptionSucceeded = receiveSingleTransmission (sourceInfo) val time = (System.nanoTime - start) / 1e9 // Updating some statistics in sourceInfo. Master will be using them later - if (!receptionSucceeded) { sourceInfo.receptionFailed = true } + if (!receptionSucceeded) { + sourceInfo.receptionFailed = true + } sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time // Send back statistics to the Master @@ -351,15 +361,20 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) oisSource = new ObjectInputStream (clientSocketToSource.getInputStream) - logInfo (System.currentTimeMillis + ": " + "Inside receiveSingleTransmission") - logInfo (System.currentTimeMillis + ": " + "totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) + logInfo ("Inside receiveSingleTransmission") + logInfo ("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) // Send the range oosSource.writeObject((hasBlocks, totalBlocks)) oosSource.flush for (i <- hasBlocks until totalBlocks) { + val recvStartTime = System.currentTimeMillis val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] + val receptionTime = (System.currentTimeMillis - recvStartTime) + + logInfo ("Received block: " + bcBlock.blockID + " from " + sourceInfo + " in " + receptionTime + " millis.") + arrayOfBlocks(hasBlocks) = bcBlock hasBlocks += 1 // Set to true if at least one block is received @@ -367,23 +382,29 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) hasBlocksLock.synchronized { hasBlocksLock.notifyAll } - logInfo (System.currentTimeMillis + ": " + "Received block: " + i + " " + bcBlock) } - logInfo (System.currentTimeMillis + ": " + "After the receive loop") + logInfo ("After the receive loop...") } catch { case e: Exception => { - logInfo (System.currentTimeMillis + ": " + "receiveSingleTransmission had a " + e) + logInfo ("receiveSingleTransmission had a " + e) } } finally { - if (oisSource != null) { oisSource.close } - if (oosSource != null) { oosSource.close } - if (clientSocketToSource != null) { clientSocketToSource.close } + if (oisSource != null) { + oisSource.close + } + if (oosSource != null) { + oosSource.close + } + if (clientSocketToSource != null) { + clientSocketToSource.close + } } return receptionSucceeded } - class GuideMultipleRequests extends Thread with Logging { + class GuideMultipleRequests + extends Thread with Logging { override def run = { // TODO: Cached threadpool has 60 s keep alive timer var threadPool = Executors.newCachedThreadPool @@ -391,7 +412,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) serverSocket = new ServerSocket (0) guidePort = serverSocket.getLocalPort - logInfo (System.currentTimeMillis + ": " + "GuideMultipleRequests" + serverSocket + " " + guidePort) + logInfo ("GuideMultipleRequests => " + serverSocket + " " + guidePort) guidePortLock.synchronized { guidePortLock.notifyAll @@ -403,16 +424,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) while (keepAccepting || !hasCopyInHDFS) { var clientSocket: Socket = null try { - serverSocket.setSoTimeout (BroadcastCS.serverSocketTimout) + serverSocket.setSoTimeout (BroadcastCS.ServerSocketTimeout) clientSocket = serverSocket.accept } catch { case e: Exception => { - logInfo ("GuideMultipleRequests Timeout. Stopping listening..." + hasCopyInHDFS) + logInfo ("GuideMultipleRequests Timeout.") keepAccepting = false } } if (clientSocket != null) { - logInfo (System.currentTimeMillis + ": " + "Guide:Accepted new client connection:" + clientSocket) + logInfo ("Guide: Accepted new client connection: " + clientSocket) try { threadPool.execute (new GuideSingleRequest (clientSocket)) } catch { @@ -423,12 +444,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } BroadcastCS.unregisterValue (uuid) } finally { - serverSocket.close + if (serverSocket != null) { + logInfo ("GuideMultipleRequests now stopping...") + serverSocket.close + } } } class GuideSingleRequest (val clientSocket: Socket) - extends Runnable with Logging { + extends Thread with Logging { private val oos = new ObjectOutputStream (clientSocket.getOutputStream) oos.flush private val ois = new ObjectInputStream (clientSocket.getInputStream) @@ -436,9 +460,9 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) private var selectedSourceInfo: SourceInfo = null private var thisWorkerInfo:SourceInfo = null - def run = { + override def run = { try { - logInfo (System.currentTimeMillis + ": " + "new GuideSingleRequest is running") + logInfo ("new GuideSingleRequest is running") // Connecting worker is sending in its hostAddress and listenPort it will // be listening to. ReplicaID is 0 and other fields are invalid (-1) var sourceInfo = ois.readObject.asInstanceOf[SourceInfo] @@ -446,14 +470,14 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) pqOfSources.synchronized { // Select a suitable source and send it back to the worker selectedSourceInfo = selectSuitableSource (sourceInfo) - logInfo (System.currentTimeMillis + ": " + "Sending selectedSourceInfo:" + selectedSourceInfo) + logInfo ("Sending selectedSourceInfo:" + selectedSourceInfo) oos.writeObject (selectedSourceInfo) oos.flush // Add this new (if it can finish) source to the PQ of sources thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress, sourceInfo.listenPort, totalBlocks, totalBytes, 0) - logInfo (System.currentTimeMillis + ": " + "Adding possible new source to pqOfSources: " + thisWorkerInfo) + logInfo ("Adding possible new source to pqOfSources: " + thisWorkerInfo) pqOfSources.add (thisWorkerInfo) } @@ -481,7 +505,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) sourceInfo.hostAddress, sourceInfo.MBps) // No need to find and update thisWorkerInfo, but add its replica - if (BroadcastCS.dualMode) { + if (BroadcastCS.DualMode) { pqOfSources.add (new SourceInfo (thisWorkerInfo.hostAddress, thisWorkerInfo.listenPort, totalBlocks, totalBytes, 1)) } @@ -504,7 +528,9 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } // Remove thisWorkerInfo - if (pqOfSources != null) { pqOfSources.remove (thisWorkerInfo) } + if (pqOfSources != null) { + pqOfSources.remove (thisWorkerInfo) + } } } } finally { @@ -534,14 +560,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } } - class ServeMultipleRequests extends Thread with Logging { + class ServeMultipleRequests + extends Thread with Logging { override def run = { var threadPool = Executors.newCachedThreadPool var serverSocket: ServerSocket = null serverSocket = new ServerSocket (0) listenPort = serverSocket.getLocalPort - logInfo (System.currentTimeMillis + ": " + "ServeMultipleRequests" + serverSocket + " " + listenPort) + logInfo ("ServeMultipleRequests started with " + serverSocket) listenPortLock.synchronized { listenPortLock.notifyAll @@ -552,16 +579,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) while (keepAccepting) { var clientSocket: Socket = null try { - serverSocket.setSoTimeout (BroadcastCS.serverSocketTimout) + serverSocket.setSoTimeout (BroadcastCS.ServerSocketTimeout) clientSocket = serverSocket.accept } catch { case e: Exception => { - logInfo ("ServeMultipleRequests Timeout. Stopping listening...") + logInfo ("ServeMultipleRequests Timeout.") keepAccepting = false } } if (clientSocket != null) { - logInfo (System.currentTimeMillis + ": " + "Serve:Accepted new client connection:" + clientSocket) + logInfo ("Serve: Accepted new client connection: " + clientSocket) try { threadPool.execute (new ServeSingleRequest (clientSocket)) } catch { @@ -571,12 +598,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } } } finally { - serverSocket.close + if (serverSocket != null) { + logInfo ("ServeMultipleRequests now stopping...") + serverSocket.close + } } } class ServeSingleRequest (val clientSocket: Socket) - extends Runnable with Logging { + extends Thread with Logging { private val oos = new ObjectOutputStream (clientSocket.getOutputStream) oos.flush private val ois = new ObjectInputStream (clientSocket.getInputStream) @@ -584,9 +614,9 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) private var sendFrom = 0 private var sendUntil = totalBlocks - def run = { + override def run = { try { - logInfo (System.currentTimeMillis + ": " + "new ServeSingleRequest is running") + logInfo ("new ServeSingleRequest is running") // Receive range to send var sendRange = ois.readObject.asInstanceOf[(Int, Int)] @@ -599,10 +629,10 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // If something went wrong, e.g., the worker at the other end died etc. // then close everything up case e: Exception => { - logInfo (System.currentTimeMillis + ": " + "ServeSingleRequest had a " + e) + logInfo ("ServeSingleRequest had a " + e) } } finally { - logInfo (System.currentTimeMillis + ": " + "ServeSingleRequest is closing streams and sockets") + logInfo ("ServeSingleRequest is closing streams and sockets") ois.close oos.close clientSocket.close @@ -629,7 +659,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } catch { case e: Exception => { } } - logInfo (System.currentTimeMillis + ": " + "Send block: " + i + " " + arrayOfBlocks(i)) + logInfo ("Sent block: " + i + " to " + clientSocket) } } } @@ -638,13 +668,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) //@serializable //class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean) -// extends BroadcastRecipe with Logging { +//extends BroadcastRecipe with Logging { // def value = value_ // BroadcastSS.synchronized { BroadcastSS.values.put (uuid, value_) } // -// if (!local) { sendBroadcast } +// if (!local) { +// sendBroadcast +// } // // @transient var publishThread: PublishThread = null // @transient var hasCopyInHDFS = false @@ -675,7 +707,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // if (receptionSucceeded) { // value_ = BroadcastSS.values.get(uuid).asInstanceOf[T] // } else { -// logInfo (System.currentTimeMillis + ": " + "Reading from HDFS") +// logInfo ("Reading from HDFS") // val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) // value_ = fileIn.readObject.asInstanceOf[T] // BroadcastSS.values.put(uuid, value_) @@ -683,16 +715,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // } // // val time = (System.nanoTime - start) / 1e9 -// logInfo( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s") +// logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s") // } // } // } // -// class PublishThread extends Thread with Logging { +// class PublishThread +// extends Thread with Logging { // override def run = { // // TODO: Put some delay here to give time others to register // // Thread.sleep (5000) -// logInfo (System.currentTimeMillis + ": " + "Waited. Now sending...") +// logInfo ("Waited. Now sending...") // BroadcastSS.synchronized { // BroadcastSS.publishVariable[T] (uuid, value) // } @@ -702,13 +735,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) @serializable class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) - extends BroadcastRecipe with Logging { +extends BroadcastRecipe with Logging { def value = value_ - BroadcastCH.synchronized { BroadcastCH.values.put(uuid, value_) } + BroadcastCH.synchronized { + BroadcastCH.values.put(uuid, value_) + } - if (!local) { sendBroadcast } + if (!local) { + sendBroadcast + } def sendBroadcast () { val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) @@ -724,7 +761,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) if (cachedVal != null) { value_ = cachedVal.asInstanceOf[T] } else { - logInfo( System.currentTimeMillis + ": " + "Started reading Broadcasted variable " + uuid) + logInfo( "Started reading Broadcasted variable " + uuid) val start = System.nanoTime val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) @@ -733,7 +770,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) fileIn.close val time = (System.nanoTime - start) / 1e9 - logInfo( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s") + logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s") } } } @@ -742,7 +779,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) @serializable case class SourceInfo (val hostAddress: String, val listenPort: Int, val totalBlocks: Int, val totalBytes: Int, val replicaID: Int) - extends Comparable [SourceInfo] with Logging { +extends Comparable [SourceInfo] with Logging { var currentLeechers = 0 var receptionFailed = false @@ -779,7 +816,8 @@ case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock], @transient var hasBlocks = 0 } -private object Broadcast extends Logging { +private object Broadcast +extends Logging { private var initialized = false // Will be called by SparkContext or Executor before using Broadcast @@ -801,7 +839,8 @@ private object Broadcast extends Logging { } } -private object BroadcastCS extends Logging { +private object BroadcastCS +extends Logging { val values = new MapMaker ().softValues ().makeMap[UUID, Any] var valueToGuidePortMap = Map[UUID, Int] () @@ -811,12 +850,15 @@ private object BroadcastCS extends Logging { private var initialized = false private var isMaster_ = false - private var masterHostAddress_ = "127.0.0.1" - private var masterTrackerPort_ : Int = 11111 - private var blockSize_ : Int = 512 * 1024 - private var maxRetryCount_ : Int = 2 - private var serverSocketTimout_ : Int = 50000 - private var dualMode_ : Boolean = false + private var MasterHostAddress_ = "127.0.0.1" + private var MasterTrackerPort_ : Int = 11111 + private var BlockSize_ : Int = 512 * 1024 + private var MaxRetryCount_ : Int = 2 + + private var TrackerSocketTimeout_ : Int = 50000 + private var ServerSocketTimeout_ : Int = 10000 + + private var DualMode_ : Boolean = false private var trackMV: TrackMultipleValues = null @@ -828,18 +870,22 @@ private object BroadcastCS extends Logging { def initialize (isMaster__ : Boolean) { synchronized { if (!initialized) { - masterHostAddress_ = - System.getProperty ("spark.broadcast.masterHostAddress", "127.0.0.1") - masterTrackerPort_ = - System.getProperty ("spark.broadcast.masterTrackerPort", "11111").toInt - blockSize_ = - System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024 - maxRetryCount_ = - System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt - serverSocketTimout_ = - System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt - dualMode_ = - System.getProperty ("spark.broadcast.dualMode", "false").toBoolean + MasterHostAddress_ = + System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1") + MasterTrackerPort_ = + System.getProperty ("spark.broadcast.MasterTrackerPort", "11111").toInt + BlockSize_ = + System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024 + MaxRetryCount_ = + System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt + + TrackerSocketTimeout_ = + System.getProperty ("spark.broadcast.TrackerSocketTimeout", "50000").toInt + ServerSocketTimeout_ = + System.getProperty ("spark.broadcast.ServerSocketTimeout", "10000").toInt + + DualMode_ = + System.getProperty ("spark.broadcast.DualMode", "false").toBoolean isMaster_ = isMaster__ @@ -847,7 +893,7 @@ private object BroadcastCS extends Logging { trackMV = new TrackMultipleValues trackMV.setDaemon (true) trackMV.start - logInfo (System.currentTimeMillis + ": " + "TrackMultipleValues started") + logInfo ("TrackMultipleValues started...") } initialized = true @@ -855,12 +901,15 @@ private object BroadcastCS extends Logging { } } - def masterHostAddress = masterHostAddress_ - def masterTrackerPort = masterTrackerPort_ - def blockSize = blockSize_ - def maxRetryCount = maxRetryCount_ - def serverSocketTimout = serverSocketTimout_ - def dualMode = dualMode_ + def MasterHostAddress = MasterHostAddress_ + def MasterTrackerPort = MasterTrackerPort_ + def BlockSize = BlockSize_ + def MaxRetryCount = MaxRetryCount_ + + def TrackerSocketTimeout = TrackerSocketTimeout_ + def ServerSocketTimeout = ServerSocketTimeout_ + + def DualMode = DualMode_ def isMaster = isMaster_ @@ -869,7 +918,7 @@ private object BroadcastCS extends Logging { def registerValue (uuid: UUID, guidePort: Int) = { valueToGuidePortMap.synchronized { valueToGuidePortMap += (uuid -> guidePort) - logInfo (System.currentTimeMillis + ": " + "New value registered with the Tracker " + valueToGuidePortMap) + logInfo ("New value registered with the Tracker " + valueToGuidePortMap) } } @@ -877,7 +926,7 @@ private object BroadcastCS extends Logging { valueToGuidePortMap.synchronized { // Set to 0 to make sure that people read it from HDFS valueToGuidePortMap (uuid) = 0 - logInfo (System.currentTimeMillis + ": " + "Value unregistered from the Tracker " + valueToGuidePortMap) + logInfo ("Value unregistered from the Tracker " + valueToGuidePortMap) } } @@ -895,22 +944,23 @@ private object BroadcastCS extends Logging { } } - class TrackMultipleValues extends Thread with Logging { + class TrackMultipleValues + extends Thread with Logging { var keepAccepting = true override def run = { var threadPool = Executors.newCachedThreadPool var serverSocket: ServerSocket = null - serverSocket = new ServerSocket (BroadcastCS.masterTrackerPort) - logInfo (System.currentTimeMillis + ": " + "TrackMultipleValues" + serverSocket) + serverSocket = new ServerSocket (BroadcastCS.MasterTrackerPort) + logInfo ("TrackMultipleValues" + serverSocket) try { while (keepAccepting) { var clientSocket: Socket = null try { // TODO: - serverSocket.setSoTimeout (serverSocketTimout) + serverSocket.setSoTimeout (ServerSocketTimeout) clientSocket = serverSocket.accept } catch { case e: Exception => { @@ -937,7 +987,7 @@ private object BroadcastCS extends Logging { if (valueToGuidePortMap.contains (uuid)) { valueToGuidePortMap (uuid) } else -1 - logInfo (System.currentTimeMillis + ": " + "TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + guidePort) + logInfo ("TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + guidePort) oos.writeObject (guidePort) } catch { case e: Exception => { } @@ -972,7 +1022,7 @@ private object BroadcastCS extends Logging { // private var masterBootHost_ = "127.0.0.1" // private var masterBootPort_ : Int = 22222 // private var blockSize_ : Int = 512 * 1024 -// private var maxRetryCount_ : Int = 2 +// private var MaxRetryCount_ : Int = 2 // // private var masterBootAddress_ : InetSocketAddress = null // private var localBindPort_ : Int = -1 @@ -995,7 +1045,7 @@ private object BroadcastCS extends Logging { // synchronized { // if (!initialized) { // masterBootHost_ = -// System.getProperty ("spark.broadcast.masterHostAddress", "127.0.0.1") +// System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1") // masterBootPort_ = // System.getProperty ("spark.broadcast.masterBootPort", "22222").toInt // @@ -1004,8 +1054,8 @@ private object BroadcastCS extends Logging { // // blockSize_ = // System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024 -// maxRetryCount_ = -// System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt +// MaxRetryCount_ = +// System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt // // isMaster_ = isMaster__ // @@ -1019,21 +1069,27 @@ private object BroadcastCS extends Logging { // // def masterBootAddress = masterBootAddress_ // def blockSize = blockSize_ -// def maxRetryCount = maxRetryCount_ +// def MaxRetryCount = MaxRetryCount_ // // def pEnvironment: Environment = { -// if (pEnvironment_ == null) { initializeSplitStream } +// if (pEnvironment_ == null) { +// initializeSplitStream +// } // pEnvironment_ // } // // def pastryNode: PastryNode = { -// if (pastryNode_ == null) { initializeSplitStream } +// if (pastryNode_ == null) { +// initializeSplitStream +// } // pastryNode_ // } // // def localBindPort = { // if (localBindPort_ == -1) { -// if (isMaster) { localBindPort_ = masterBootPort_ } +// if (isMaster) { +// localBindPort_ = masterBootPort_ +// } // else { // // TODO: What's the best way of finding a free port? // val sSocket = new ServerSocket (0) @@ -1092,16 +1148,22 @@ private object BroadcastCS extends Logging { // def receiveVariable[A] (uuid: UUID): Boolean = { // // TODO: Things will change if out-of-order variable recepetion is supported // -// logInfo (System.currentTimeMillis + ": " + "In receiveVariable") +// logInfo ("In receiveVariable") // // // Check in valueBytes -// if (xferValueBytesToValues[A] (uuid)) { return true } +// if (xferValueBytesToValues[A] (uuid)) { +// return true +// } // // // Check if its in progress -// for (i <- 0 until maxRetryCount) { -// logInfo (System.currentTimeMillis + ": " + uuid + " " + curUUID) -// while (uuid == curUUID) { Thread.sleep (100) } // TODO: How long to sleep -// if (xferValueBytesToValues[A] (uuid)) { return true } +// for (i <- 0 until MaxRetryCount) { +// logInfo (uuid + " " + curUUID) +// while (uuid == curUUID) { +// Thread.sleep (100) // TODO: How long to sleep +// } +// if (xferValueBytesToValues[A] (uuid)) { +// return true +// } // // // Wait for a while to see if we've reached here before xmission started // Thread.sleep (100) @@ -1111,10 +1173,14 @@ private object BroadcastCS extends Logging { // // private def xferValueBytesToValues[A] (uuid: UUID): Boolean = { // var cachedValueBytes: Array[Byte] = null -// valueBytes.synchronized { cachedValueBytes = valueBytes.get (uuid) } +// valueBytes.synchronized { +// cachedValueBytes = valueBytes.get (uuid) +// } // if (cachedValueBytes != null) { // val cachedValue = byteArrayToObject[A] (cachedValueBytes) -// values.synchronized { values.put (uuid, cachedValue) } +// values.synchronized { +// values.put (uuid, cachedValue) +// } // return true // } // return false @@ -1138,19 +1204,22 @@ private object BroadcastCS extends Logging { // private def intToByteArray (value: Int): Array[Byte] = { // var retVal = new Array[Byte] (4) -// for (i <- 0 until 4) +// for (i <- 0 until 4) { // retVal(i) = (value >> ((4 - 1 - i) * 8)).toByte +// } // return retVal // } // private def byteArrayToInt (arr: Array[Byte], offset: Int): Int = { // var retVal = 0 -// for (i <- 0 until 4) +// for (i <- 0 until 4) { // retVal += ((arr(i + offset).toInt & 0x000000FF) << ((4 - 1 - i) * 8)) +// } // return retVal // } -// class SSClient (pastryNode: PastryNode) extends SplitStreamClient +// class SSClient (pastryNode: PastryNode) +// extends SplitStreamClient // with Application { // // Magic bits: 11111100001100100100110000111111 // val magicBits = 0xFC324C3F @@ -1188,7 +1257,9 @@ private object BroadcastCS extends Logging { // // // Subscribing notifies your application when data comes through the tree // myStripes = myChannel.getStripes -// for (curStripe <- myStripes) { curStripe.subscribe (this) } +// for (curStripe <- myStripes) { +// curStripe.subscribe (this) +// } // } // // // Part of SplitStreamClient. Called when a published message is received. @@ -1215,7 +1286,7 @@ private object BroadcastCS extends Logging { // curBlockBitmap = new Array[Boolean] (curTotalBlocks) // curArrayOfBytes = new Array[Byte] (curTotalBytes) // -// logInfo (System.currentTimeMillis + ": " + curUUID + " " + curTotalBlocks + " " + curTotalBytes) +// logInfo (curUUID + " " + curTotalBlocks + " " + curTotalBytes) // } // case DATA_MSG => { // val realInfo = byteArrayToObject[(UUID, Int, Array[Byte])] ( @@ -1235,7 +1306,7 @@ private object BroadcastCS extends Logging { // System.arraycopy (blockData, 0, curArrayOfBytes, // blockIndex * blockSize, blockData.length) // -// logInfo (System.currentTimeMillis + ": " + "Got stuff for: " + blockUUID) +// logInfo ("Got stuff for: " + blockUUID) // // // Done receiving // if (curHasBlocks == curTotalBlocks) { @@ -1244,7 +1315,7 @@ private object BroadcastCS extends Logging { // valueBytes.put (curUUID, curArrayOfBytes) // } // -// logInfo (System.currentTimeMillis + ": " + "Finished reading. Stored in valueBytes") +// logInfo ("Finished reading. Stored in valueBytes") // // // RESET // curUUID = null @@ -1300,13 +1371,16 @@ private object BroadcastCS extends Logging { // myStripes(stripeID).publish (bytesToSend) // } -// /* class PublishContent extends Message { +// /* class PublishContent +// extends Message { // def getPriority: Int = { Message.MEDIUM_PRIORITY } // } */ // // // Error handling -// def joinFailed(s: Stripe) = { logInfo ("joinFailed(" + s + ")") } - +// def joinFailed(s: Stripe) = { +// logInfo ("joinFailed(" + s + ")") +// } +// // // Rest of the Application interface. NOT USED. // def deliver (id: rice.p2p.commonapi.Id, message: Message) = { } // def forward (message: RouteMessage): Boolean = false @@ -1314,7 +1388,8 @@ private object BroadcastCS extends Logging { // } //} -private object BroadcastCH extends Logging { +private object BroadcastCH +extends Logging { val values = new MapMaker ().softValues ().makeMap[UUID, Any] private var initialized = false -- cgit v1.2.3