diff options
-rw-r--r-- | src/scala/spark/BitTorrentBroadcast.scala | 30 | ||||
-rw-r--r-- | src/scala/spark/Broadcast.scala | 11 | ||||
-rw-r--r-- | src/scala/spark/ChainedBroadcast.scala | 18 | ||||
-rw-r--r-- | src/scala/spark/DfsBroadcast.scala | 2 |
4 files changed, 28 insertions, 33 deletions
diff --git a/src/scala/spark/BitTorrentBroadcast.scala b/src/scala/spark/BitTorrentBroadcast.scala index e8432f9143..96d3643ffd 100644 --- a/src/scala/spark/BitTorrentBroadcast.scala +++ b/src/scala/spark/BitTorrentBroadcast.scala @@ -1047,7 +1047,7 @@ extends Logging { private var initialized = false private var isMaster_ = false - private var MasterHostAddress_ = "127.0.0.1" + private var MasterHostAddress_ = InetAddress.getLocalHost.getHostAddress private var MasterTrackerPort_ : Int = 11111 private var BlockSize_ : Int = 512 * 1024 private var MaxRetryCount_ : Int = 2 @@ -1079,40 +1079,38 @@ extends Logging { def initialize (isMaster__ : Boolean): Unit = { synchronized { if (!initialized) { - MasterHostAddress_ = - System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1") MasterTrackerPort_ = - System.getProperty ("spark.broadcast.MasterTrackerPort", "11111").toInt + System.getProperty ("spark.broadcast.masterTrackerPort", "11111").toInt BlockSize_ = - System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024 + System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024 MaxRetryCount_ = - System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt + System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt TrackerSocketTimeout_ = - System.getProperty ("spark.broadcast.TrackerSocketTimeout", "50000").toInt + System.getProperty ("spark.broadcast.trackerSocketTimeout", "50000").toInt ServerSocketTimeout_ = - System.getProperty ("spark.broadcast.ServerSocketTimeout", "10000").toInt + System.getProperty ("spark.broadcast.serverSocketTimeout", "10000").toInt MinKnockInterval_ = - System.getProperty ("spark.broadcast.MinKnockInterval", "500").toInt + System.getProperty ("spark.broadcast.minKnockInterval", "500").toInt MaxKnockInterval_ = - System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt + System.getProperty ("spark.broadcast.maxKnockInterval", "999").toInt MaxPeersInGuideResponse_ = - System.getProperty ("spark.broadcast.MaxPeersInGuideResponse", "4").toInt + System.getProperty ("spark.broadcast.maxPeersInGuideResponse", "4").toInt MaxRxPeers_ = - System.getProperty ("spark.broadcast.MaxRxPeers", "4").toInt + System.getProperty ("spark.broadcast.maxRxPeers", "4").toInt MaxTxPeers_ = - System.getProperty ("spark.broadcast.MaxTxPeers", "4").toInt + System.getProperty ("spark.broadcast.maxTxPeers", "4").toInt MaxChatTime_ = - System.getProperty ("spark.broadcast.MaxChatTime", "250").toInt + System.getProperty ("spark.broadcast.maxChatTime", "250").toInt MaxChatBlocks_ = - System.getProperty ("spark.broadcast.MaxChatBlocks", "1024").toInt + System.getProperty ("spark.broadcast.maxChatBlocks", "1024").toInt EndGameFraction_ = - System.getProperty ("spark.broadcast.EndGameFraction", "1.0").toDouble + System.getProperty ("spark.broadcast.endGameFraction", "1.0").toDouble isMaster_ = isMaster__ diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index f59912b4c8..08a718540e 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -12,8 +12,6 @@ trait Broadcast[T] { // We cannot have an abstract readObject here due to some weird issues with // readObject having to be 'private' in sub-classes. Possibly a Scala bug! - def sendBroadcast: Unit - override def toString = "spark.Broadcast(" + uuid + ")" } @@ -30,11 +28,12 @@ extends Logging { // Called by SparkContext or Executor before using Broadcast def initialize (isMaster: Boolean): Unit = synchronized { if (!initialized) { - val broadcastFactoryClass = System.getProperty("spark.broadcast.Factory", - "spark.BitTorrentBroadcastFactory") + val broadcastFactoryClass = System.getProperty("spark.broadcast.factory", + "spark.DfsBroadcastFactory") val booleanArgs = Array[AnyRef] (isMaster.asInstanceOf[AnyRef]) -// broadcastFactory = Class.forName(broadcastFactoryClass).getConstructors()(0).newInstance(booleanArgs:_*).asInstanceOf[BroadcastFactory] - broadcastFactory = Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] + + broadcastFactory = + Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] // Initialize appropriate BroadcastFactory and BroadcastObject broadcastFactory.initialize(isMaster) diff --git a/src/scala/spark/ChainedBroadcast.scala b/src/scala/spark/ChainedBroadcast.scala index 8144148c9a..afd3c0293c 100644 --- a/src/scala/spark/ChainedBroadcast.scala +++ b/src/scala/spark/ChainedBroadcast.scala @@ -729,7 +729,7 @@ extends Logging { private var initialized = false private var isMaster_ = false - private var MasterHostAddress_ = "127.0.0.1" + private var MasterHostAddress_ = InetAddress.getLocalHost.getHostAddress private var MasterTrackerPort_ : Int = 22222 private var BlockSize_ : Int = 512 * 1024 private var MaxRetryCount_ : Int = 2 @@ -745,24 +745,22 @@ extends Logging { def initialize (isMaster__ : Boolean): Unit = { synchronized { if (!initialized) { - MasterHostAddress_ = - System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1") MasterTrackerPort_ = - System.getProperty ("spark.broadcast.MasterTrackerPort", "22222").toInt + System.getProperty ("spark.broadcast.masterTrackerPort", "22222").toInt BlockSize_ = - System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024 + System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024 MaxRetryCount_ = - System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt + System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt TrackerSocketTimeout_ = - System.getProperty ("spark.broadcast.TrackerSocketTimeout", "50000").toInt + System.getProperty ("spark.broadcast.trackerSocketTimeout", "50000").toInt ServerSocketTimeout_ = - System.getProperty ("spark.broadcast.ServerSocketTimeout", "10000").toInt + System.getProperty ("spark.broadcast.serverSocketTimeout", "10000").toInt MinKnockInterval_ = - System.getProperty ("spark.broadcast.MinKnockInterval", "500").toInt + System.getProperty ("spark.broadcast.minKnockInterval", "500").toInt MaxKnockInterval_ = - System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt + System.getProperty ("spark.broadcast.maxKnockInterval", "999").toInt isMaster_ = isMaster__ diff --git a/src/scala/spark/DfsBroadcast.scala b/src/scala/spark/DfsBroadcast.scala index 7b1ebce851..3178076916 100644 --- a/src/scala/spark/DfsBroadcast.scala +++ b/src/scala/spark/DfsBroadcast.scala @@ -82,7 +82,7 @@ extends Logging { conf.setInt("dfs.replication", rep) fileSystem = FileSystem.get(new URI(dfs), conf) } - workDir = System.getProperty("spark.dfs.workdir", "/tmp") + workDir = System.getProperty("spark.dfs.workDir", "/tmp") compress = System.getProperty("spark.compress", "false").toBoolean initialized = true |