aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-12-13 14:36:39 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-12-13 14:36:39 -0800
commit0a5c24ae3d526bba2cc11c0c7a1a5efa5fd9a39c (patch)
tree236ddb114efdef1c65cf156890e772795770d960
parent06dc4a5148ab235a524be296179cf864d4702a15 (diff)
downloadspark-0a5c24ae3d526bba2cc11c0c7a1a5efa5fd9a39c.tar.gz
spark-0a5c24ae3d526bba2cc11c0c7a1a5efa5fd9a39c.tar.bz2
spark-0a5c24ae3d526bba2cc11c0c7a1a5efa5fd9a39c.zip
- Default broadcast mechanism is set to DfsBroadcast
- Configuration parameters are renamed to follow our convention - Master now automatically supplies its hostAddress instead of reading from config file - sendBroadcast has been removed from the Broadcast trait
-rw-r--r--src/scala/spark/BitTorrentBroadcast.scala30
-rw-r--r--src/scala/spark/Broadcast.scala11
-rw-r--r--src/scala/spark/ChainedBroadcast.scala18
-rw-r--r--src/scala/spark/DfsBroadcast.scala2
4 files changed, 28 insertions, 33 deletions
diff --git a/src/scala/spark/BitTorrentBroadcast.scala b/src/scala/spark/BitTorrentBroadcast.scala
index e8432f9143..96d3643ffd 100644
--- a/src/scala/spark/BitTorrentBroadcast.scala
+++ b/src/scala/spark/BitTorrentBroadcast.scala
@@ -1047,7 +1047,7 @@ extends Logging {
private var initialized = false
private var isMaster_ = false
- private var MasterHostAddress_ = "127.0.0.1"
+ private var MasterHostAddress_ = InetAddress.getLocalHost.getHostAddress
private var MasterTrackerPort_ : Int = 11111
private var BlockSize_ : Int = 512 * 1024
private var MaxRetryCount_ : Int = 2
@@ -1079,40 +1079,38 @@ extends Logging {
def initialize (isMaster__ : Boolean): Unit = {
synchronized {
if (!initialized) {
- MasterHostAddress_ =
- System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1")
MasterTrackerPort_ =
- System.getProperty ("spark.broadcast.MasterTrackerPort", "11111").toInt
+ System.getProperty ("spark.broadcast.masterTrackerPort", "11111").toInt
BlockSize_ =
- System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024
+ System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
MaxRetryCount_ =
- System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt
+ System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
TrackerSocketTimeout_ =
- System.getProperty ("spark.broadcast.TrackerSocketTimeout", "50000").toInt
+ System.getProperty ("spark.broadcast.trackerSocketTimeout", "50000").toInt
ServerSocketTimeout_ =
- System.getProperty ("spark.broadcast.ServerSocketTimeout", "10000").toInt
+ System.getProperty ("spark.broadcast.serverSocketTimeout", "10000").toInt
MinKnockInterval_ =
- System.getProperty ("spark.broadcast.MinKnockInterval", "500").toInt
+ System.getProperty ("spark.broadcast.minKnockInterval", "500").toInt
MaxKnockInterval_ =
- System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt
+ System.getProperty ("spark.broadcast.maxKnockInterval", "999").toInt
MaxPeersInGuideResponse_ =
- System.getProperty ("spark.broadcast.MaxPeersInGuideResponse", "4").toInt
+ System.getProperty ("spark.broadcast.maxPeersInGuideResponse", "4").toInt
MaxRxPeers_ =
- System.getProperty ("spark.broadcast.MaxRxPeers", "4").toInt
+ System.getProperty ("spark.broadcast.maxRxPeers", "4").toInt
MaxTxPeers_ =
- System.getProperty ("spark.broadcast.MaxTxPeers", "4").toInt
+ System.getProperty ("spark.broadcast.maxTxPeers", "4").toInt
MaxChatTime_ =
- System.getProperty ("spark.broadcast.MaxChatTime", "250").toInt
+ System.getProperty ("spark.broadcast.maxChatTime", "250").toInt
MaxChatBlocks_ =
- System.getProperty ("spark.broadcast.MaxChatBlocks", "1024").toInt
+ System.getProperty ("spark.broadcast.maxChatBlocks", "1024").toInt
EndGameFraction_ =
- System.getProperty ("spark.broadcast.EndGameFraction", "1.0").toDouble
+ System.getProperty ("spark.broadcast.endGameFraction", "1.0").toDouble
isMaster_ = isMaster__
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index f59912b4c8..08a718540e 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -12,8 +12,6 @@ trait Broadcast[T] {
// We cannot have an abstract readObject here due to some weird issues with
// readObject having to be 'private' in sub-classes. Possibly a Scala bug!
- def sendBroadcast: Unit
-
override def toString = "spark.Broadcast(" + uuid + ")"
}
@@ -30,11 +28,12 @@ extends Logging {
// Called by SparkContext or Executor before using Broadcast
def initialize (isMaster: Boolean): Unit = synchronized {
if (!initialized) {
- val broadcastFactoryClass = System.getProperty("spark.broadcast.Factory",
- "spark.BitTorrentBroadcastFactory")
+ val broadcastFactoryClass = System.getProperty("spark.broadcast.factory",
+ "spark.DfsBroadcastFactory")
val booleanArgs = Array[AnyRef] (isMaster.asInstanceOf[AnyRef])
-// broadcastFactory = Class.forName(broadcastFactoryClass).getConstructors()(0).newInstance(booleanArgs:_*).asInstanceOf[BroadcastFactory]
- broadcastFactory = Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
+
+ broadcastFactory =
+ Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
broadcastFactory.initialize(isMaster)
diff --git a/src/scala/spark/ChainedBroadcast.scala b/src/scala/spark/ChainedBroadcast.scala
index 8144148c9a..afd3c0293c 100644
--- a/src/scala/spark/ChainedBroadcast.scala
+++ b/src/scala/spark/ChainedBroadcast.scala
@@ -729,7 +729,7 @@ extends Logging {
private var initialized = false
private var isMaster_ = false
- private var MasterHostAddress_ = "127.0.0.1"
+ private var MasterHostAddress_ = InetAddress.getLocalHost.getHostAddress
private var MasterTrackerPort_ : Int = 22222
private var BlockSize_ : Int = 512 * 1024
private var MaxRetryCount_ : Int = 2
@@ -745,24 +745,22 @@ extends Logging {
def initialize (isMaster__ : Boolean): Unit = {
synchronized {
if (!initialized) {
- MasterHostAddress_ =
- System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1")
MasterTrackerPort_ =
- System.getProperty ("spark.broadcast.MasterTrackerPort", "22222").toInt
+ System.getProperty ("spark.broadcast.masterTrackerPort", "22222").toInt
BlockSize_ =
- System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024
+ System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
MaxRetryCount_ =
- System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt
+ System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
TrackerSocketTimeout_ =
- System.getProperty ("spark.broadcast.TrackerSocketTimeout", "50000").toInt
+ System.getProperty ("spark.broadcast.trackerSocketTimeout", "50000").toInt
ServerSocketTimeout_ =
- System.getProperty ("spark.broadcast.ServerSocketTimeout", "10000").toInt
+ System.getProperty ("spark.broadcast.serverSocketTimeout", "10000").toInt
MinKnockInterval_ =
- System.getProperty ("spark.broadcast.MinKnockInterval", "500").toInt
+ System.getProperty ("spark.broadcast.minKnockInterval", "500").toInt
MaxKnockInterval_ =
- System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt
+ System.getProperty ("spark.broadcast.maxKnockInterval", "999").toInt
isMaster_ = isMaster__
diff --git a/src/scala/spark/DfsBroadcast.scala b/src/scala/spark/DfsBroadcast.scala
index 7b1ebce851..3178076916 100644
--- a/src/scala/spark/DfsBroadcast.scala
+++ b/src/scala/spark/DfsBroadcast.scala
@@ -82,7 +82,7 @@ extends Logging {
conf.setInt("dfs.replication", rep)
fileSystem = FileSystem.get(new URI(dfs), conf)
}
- workDir = System.getProperty("spark.dfs.workdir", "/tmp")
+ workDir = System.getProperty("spark.dfs.workDir", "/tmp")
compress = System.getProperty("spark.compress", "false").toBoolean
initialized = true