aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-26 21:16:37 -0700
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-10-26 21:16:37 -0700
commit8587353b3e5dc7190fcb64dd162b6183da1208c2 (patch)
tree3387677fa5fdb605b97850ed2f297a0dae36fade
parentd70f3549ff166e0f7da3d63ff3ce195f13c9f7b5 (diff)
downloadspark-8587353b3e5dc7190fcb64dd162b6183da1208c2.tar.gz
spark-8587353b3e5dc7190fcb64dd162b6183da1208c2.tar.bz2
spark-8587353b3e5dc7190fcb64dd162b6183da1208c2.zip
Added some tuning parameters.
-rw-r--r--conf/java-opts2
-rw-r--r--src/scala/spark/Broadcast.scala113
2 files changed, 72 insertions, 43 deletions
diff --git a/conf/java-opts b/conf/java-opts
index c7fa245f47..f279d27806 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1 +1 @@
--Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=256 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000
+-Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=11111 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.ServerSocketTimout=50000
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 83849569b4..45a3685c7c 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -2,7 +2,7 @@ package spark
import java.io._
import java.net._
-import java.util.{BitSet, Comparator, Timer, TimerTask, UUID}
+import java.util.{BitSet, Comparator, Random, Timer, TimerTask, UUID}
import com.google.common.collect.MapMaker
@@ -77,7 +77,7 @@ extends BroadcastRecipe with Logging {
hasCopyInHDFS = true
// Create a variableInfo object and store it in valueInfos
- var variableInfo = blockifyObject (value_, BroadcastBT.blockSize)
+ var variableInfo = blockifyObject (value_, BroadcastBT.BlockSize)
// Prepare the value being broadcasted
// TODO: Refactoring and clean-up required here
@@ -227,7 +227,7 @@ extends BroadcastRecipe with Logging {
var retByteArray = new Array[Byte] (totalBytes)
for (i <- 0 until totalBlocks) {
System.arraycopy (arrayOfBlocks(i).byteArray, 0, retByteArray,
- i * BroadcastBT.blockSize, arrayOfBlocks(i).byteArray.length)
+ i * BroadcastBT.BlockSize, arrayOfBlocks(i).byteArray.length)
}
byteArrayToObject (retByteArray)
}
@@ -306,9 +306,9 @@ extends BroadcastRecipe with Logging {
oosGuide.close
clientSocketToGuide.close
- // TODO: DO NOT use constant sleep value here
- // TODO: Guide should send back a backoff time, somehow
- Thread.sleep (100)
+ Thread.sleep ( BroadcastBT.ranGen.nextInt (
+ BroadcastBT.MaxKnockInterval - BroadcastBT.MinKnockInterval) +
+ BroadcastBT.MinKnockInterval)
}
}
}
@@ -321,12 +321,12 @@ extends BroadcastRecipe with Logging {
var gInfo: SourceInfo = SourceInfo ("", SourceInfo.TxOverGoToHDFS,
SourceInfo.UnusedParam, SourceInfo.UnusedParam)
- var retriesLeft = BroadcastBT.maxRetryCount
+ var retriesLeft = BroadcastBT.MaxRetryCount
do {
try {
// Connect to the tracker to find out GuideInfo
val clientSocketToTracker =
- new Socket(BroadcastBT.masterHostAddress, BroadcastBT.masterTrackerPort)
+ new Socket(BroadcastBT.MasterHostAddress, BroadcastBT.MasterTrackerPort)
val oosTracker =
new ObjectOutputStream (clientSocketToTracker.getOutputStream)
oosTracker.flush
@@ -350,9 +350,12 @@ extends BroadcastRecipe with Logging {
clientSocketToTracker.close
}
}
- retriesLeft -= 1
- // TODO: Should wait before retrying. Implement wait function.
- Thread.sleep (1234)
+
+ Thread.sleep ( BroadcastBT.ranGen.nextInt (
+ BroadcastBT.MaxKnockInterval - BroadcastBT.MinKnockInterval) +
+ BroadcastBT.MinKnockInterval)
+
+ retriesLeft -= 1
} while (retriesLeft > 0 && gInfo.listenPort == SourceInfo.TxNotStartedRetry)
logInfo ("Got this guidePort from Tracker: " + gInfo.listenPort)
@@ -596,7 +599,7 @@ extends BroadcastRecipe with Logging {
while (keepAccepting || !hasCopyInHDFS) {
var clientSocket: Socket = null
try {
- serverSocket.setSoTimeout (BroadcastBT.serverSocketTimout)
+ serverSocket.setSoTimeout (BroadcastBT.ServerSocketTimout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
@@ -711,7 +714,7 @@ extends BroadcastRecipe with Logging {
while (keepAccepting) {
var clientSocket: Socket = null
try {
- serverSocket.setSoTimeout (BroadcastBT.serverSocketTimout)
+ serverSocket.setSoTimeout (BroadcastBT.ServerSocketTimout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
@@ -740,9 +743,6 @@ extends BroadcastRecipe with Logging {
class ServeSingleRequest (val clientSocket: Socket)
extends Thread with Logging {
- // TODO: This has to be fixed somehow
- private val MAX_MILLIS_TO_CHAT = 50
-
private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
oos.flush
private val ois = new ObjectInputStream (clientSocket.getInputStream)
@@ -775,14 +775,17 @@ extends BroadcastRecipe with Logging {
val startTime = System.currentTimeMillis
var curTime = startTime
var keepSending = true
+ var blocksToSend = BroadcastBT.MaxChatBlocks
- while (keepSending && curTime - startTime < MAX_MILLIS_TO_CHAT) {
+ while (keepSending && blocksToSend > 0 &&
+ (curTime - startTime) < BroadcastBT.MaxChatTime) {
val sentBlock = pickAndSendBlock (rxSourceInfo.hasBlocksBitVector)
if (sentBlock < 0) {
keepSending = false
} else {
rxSourceInfo.hasBlocksBitVector.set (sentBlock)
}
+ blocksToSend = blocksToSend - 1
curTime = System.currentTimeMillis
}
} catch {
@@ -808,8 +811,6 @@ extends BroadcastRecipe with Logging {
var minCopies = Int.MaxValue
var nextIndex = -1
- logInfo ("Picking a block to send...")
-
// Figure out which blocks the receiver doesn't have
var tempHasBlocksBitVector =
rxHasBlocksBitVector.clone.asInstanceOf[BitSet]
@@ -945,16 +946,19 @@ extends Logging {
var valueToGuideMap = Map[UUID, SourceInfo] ()
- var sourceToSpeedMap = Map[String, Double] ()
+// var sourceToSpeedMap = Map[String, Double] ()
+
+ // Random number generator
+ var ranGen = new Random
private var initialized = false
private var isMaster_ = false
- private var masterHostAddress_ = "127.0.0.1"
- private var masterTrackerPort_ : Int = 11111
- private var blockSize_ : Int = 512 * 1024
- private var maxRetryCount_ : Int = 2
- private var serverSocketTimout_ : Int = 50000
+ private var MasterHostAddress_ = "127.0.0.1"
+ private var MasterTrackerPort_ : Int = 11111
+ private var BlockSize_ : Int = 512 * 1024
+ private var MaxRetryCount_ : Int = 2
+ private var ServerSocketTimout_ : Int = 50000
private var trackMV: TrackMultipleValues = null
@@ -963,24 +967,38 @@ extends Logging {
// 125.0 MBps = 1 Gbps link
private val MaxMBps_ = 125.0
+ // A peer syncs back to Guide after waiting randomly within following limits
+ private var MinKnockInterval_ = 500
+ private var MaxKnockInterval_ = 999
+
private var MaxPeersInGuideResponse_ = 4
+ // Maximum number of receiving and sending threads of a peer
private var MaxRxPeers_ = 4
private var MaxTxPeers_ = 4
+
+ // Peers can char at most this milliseconds or transfer this number of blocks
+ private var MaxChatTime_ = 250
+ private var MaxChatBlocks_ = 1024
def initialize (isMaster__ : Boolean) {
synchronized {
if (!initialized) {
- masterHostAddress_ =
- System.getProperty ("spark.broadcast.masterHostAddress", "127.0.0.1")
- masterTrackerPort_ =
- System.getProperty ("spark.broadcast.masterTrackerPort", "11111").toInt
- blockSize_ =
- System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
- maxRetryCount_ =
- System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
- serverSocketTimout_ =
- System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt
+ MasterHostAddress_ =
+ System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1")
+ MasterTrackerPort_ =
+ System.getProperty ("spark.broadcast.MasterTrackerPort", "11111").toInt
+ BlockSize_ =
+ System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024
+ MaxRetryCount_ =
+ System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt
+ ServerSocketTimout_ =
+ System.getProperty ("spark.broadcast.ServerSocketTimout", "50000").toInt
+
+ MinKnockInterval_ =
+ System.getProperty ("spark.broadcast.MinKnockInterval", "100").toInt
+ MaxKnockInterval_ =
+ System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt
MaxPeersInGuideResponse_ =
System.getProperty ("spark.broadcast.MaxPeersInGuideResponse", "4").toInt
@@ -990,6 +1008,11 @@ extends Logging {
MaxTxPeers_ =
System.getProperty ("spark.broadcast.MaxTxPeers", "4").toInt
+ MaxChatTime_ =
+ System.getProperty ("spark.broadcast.MaxChatTime", "4").toInt
+ MaxChatBlocks_ =
+ System.getProperty ("spark.broadcast.MaxChatBlocks", "4").toInt
+
isMaster_ = isMaster__
if (isMaster) {
@@ -1004,21 +1027,27 @@ extends Logging {
}
}
- def masterHostAddress = masterHostAddress_
- def masterTrackerPort = masterTrackerPort_
- def blockSize = blockSize_
- def maxRetryCount = maxRetryCount_
- def serverSocketTimout = serverSocketTimout_
+ def MasterHostAddress = MasterHostAddress_
+ def MasterTrackerPort = MasterTrackerPort_
+ def BlockSize = BlockSize_
+ def MaxRetryCount = MaxRetryCount_
+ def ServerSocketTimout = ServerSocketTimout_
def isMaster = isMaster_
def MaxMBps = MaxMBps_
+ def MinKnockInterval = MinKnockInterval_
+ def MaxKnockInterval = MaxKnockInterval_
+
def MaxPeersInGuideResponse = MaxPeersInGuideResponse_
def MaxRxPeers = MaxRxPeers_
def MaxTxPeers = MaxTxPeers_
+ def MaxChatTime = MaxChatTime_
+ def MaxChatBlocks = MaxChatBlocks_
+
def registerValue (uuid: UUID, gInfo: SourceInfo) = {
valueToGuideMap.synchronized {
valueToGuideMap += (uuid -> gInfo)
@@ -1059,7 +1088,7 @@ extends Logging {
var threadPool = Executors.newCachedThreadPool
var serverSocket: ServerSocket = null
- serverSocket = new ServerSocket (BroadcastBT.masterTrackerPort)
+ serverSocket = new ServerSocket (BroadcastBT.MasterTrackerPort)
logInfo ("TrackMultipleValues" + serverSocket)
try {
@@ -1067,7 +1096,7 @@ extends Logging {
var clientSocket: Socket = null
try {
// TODO:
- serverSocket.setSoTimeout (serverSocketTimout)
+ serverSocket.setSoTimeout (ServerSocketTimout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {