aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/broadcast/MultiTracker.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/broadcast/MultiTracker.scala')
-rw-r--r--core/src/main/scala/spark/broadcast/MultiTracker.scala35
1 files changed, 17 insertions, 18 deletions
diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala
index 5e76dedb94..3fd77af73f 100644
--- a/core/src/main/scala/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala
@@ -23,25 +23,24 @@ extends Logging {
var ranGen = new Random
private var initialized = false
- private var isMaster_ = false
+ private var _isDriver = false
private var stopBroadcast = false
private var trackMV: TrackMultipleValues = null
- def initialize(isMaster__ : Boolean) {
+ def initialize(__isDriver: Boolean) {
synchronized {
if (!initialized) {
+ _isDriver = __isDriver
- isMaster_ = isMaster__
-
- if (isMaster) {
+ if (isDriver) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
trackMV.start()
- // Set masterHostAddress to the master's IP address for the slaves to read
- System.setProperty("spark.MultiTracker.MasterHostAddress", Utils.localIpAddress)
+ // Set DriverHostAddress to the driver's IP address for the slaves to read
+ System.setProperty("spark.MultiTracker.DriverHostAddress", Utils.localIpAddress)
}
initialized = true
@@ -54,10 +53,10 @@ extends Logging {
}
// Load common parameters
- private var MasterHostAddress_ = System.getProperty(
- "spark.MultiTracker.MasterHostAddress", "")
- private var MasterTrackerPort_ = System.getProperty(
- "spark.broadcast.masterTrackerPort", "11111").toInt
+ private var DriverHostAddress_ = System.getProperty(
+ "spark.MultiTracker.DriverHostAddress", "")
+ private var DriverTrackerPort_ = System.getProperty(
+ "spark.broadcast.driverTrackerPort", "11111").toInt
private var BlockSize_ = System.getProperty(
"spark.broadcast.blockSize", "4096").toInt * 1024
private var MaxRetryCount_ = System.getProperty(
@@ -91,11 +90,11 @@ extends Logging {
private var EndGameFraction_ = System.getProperty(
"spark.broadcast.endGameFraction", "0.95").toDouble
- def isMaster = isMaster_
+ def isDriver = _isDriver
// Common config params
- def MasterHostAddress = MasterHostAddress_
- def MasterTrackerPort = MasterTrackerPort_
+ def DriverHostAddress = DriverHostAddress_
+ def DriverTrackerPort = DriverTrackerPort_
def BlockSize = BlockSize_
def MaxRetryCount = MaxRetryCount_
@@ -123,7 +122,7 @@ extends Logging {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
- serverSocket = new ServerSocket(MasterTrackerPort)
+ serverSocket = new ServerSocket(DriverTrackerPort)
logInfo("TrackMultipleValues started at " + serverSocket)
try {
@@ -235,7 +234,7 @@ extends Logging {
try {
// Connect to the tracker to find out GuideInfo
clientSocketToTracker =
- new Socket(MultiTracker.MasterHostAddress, MultiTracker.MasterTrackerPort)
+ new Socket(MultiTracker.DriverHostAddress, MultiTracker.DriverTrackerPort)
oosTracker =
new ObjectOutputStream(clientSocketToTracker.getOutputStream)
oosTracker.flush()
@@ -276,7 +275,7 @@ extends Logging {
}
def registerBroadcast(id: Long, gInfo: SourceInfo) {
- val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort)
+ val socket = new Socket(MultiTracker.DriverHostAddress, DriverTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)
@@ -303,7 +302,7 @@ extends Logging {
}
def unregisterBroadcast(id: Long) {
- val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort)
+ val socket = new Socket(MultiTracker.DriverHostAddress, DriverTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)