diff options
author | CrazyJvm <crazyjvm@gmail.com> | 2014-08-18 09:34:36 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2014-08-18 09:34:36 -0700 |
commit | c0cbbdeaf4f2033be03d32e3ea0288812b4edbf6 (patch) | |
tree | 7d6bbedf9c1d05bd34ec11d36c42342583184cdf /core | |
parent | 9306b8c6c8c412b9d0d5cffb6bd7a87784f0f6bf (diff) | |
download | spark-c0cbbdeaf4f2033be03d32e3ea0288812b4edbf6.tar.gz spark-c0cbbdeaf4f2033be03d32e3ea0288812b4edbf6.tar.bz2 spark-c0cbbdeaf4f2033be03d32e3ea0288812b4edbf6.zip |
SPARK-3093 : masterLock in Worker is no longer need
there's no need to use masterLock in Worker now since all communications are within Akka actor
Author: CrazyJvm <crazyjvm@gmail.com>
Closes #2008 from CrazyJvm/no-need-master-lock and squashes the following commits:
dd39e20 [CrazyJvm] fix format
58e7fa5 [CrazyJvm] there's no need to use masterLock now since all communications are within Akka actor
Diffstat (limited to 'core')
-rwxr-xr-x | core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 41 |
1 files changed, 14 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 80fde7e4b2..81400af22c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -72,7 +72,6 @@ private[spark] class Worker( val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) val testing: Boolean = sys.props.contains("spark.testing") - val masterLock: Object = new Object() var master: ActorSelection = null var masterAddress: Address = null var activeMasterUrl: String = "" @@ -145,18 +144,16 @@ private[spark] class Worker( } def changeMaster(url: String, uiUrl: String) { - masterLock.synchronized { - activeMasterUrl = url - activeMasterWebUiUrl = uiUrl - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) - masterAddress = activeMasterUrl match { - case Master.sparkUrlRegex(_host, _port) => - Address("akka.tcp", Master.systemName, _host, _port.toInt) - case x => - throw new SparkException("Invalid spark URL: " + x) - } - connected = true + activeMasterUrl = url + activeMasterWebUiUrl = uiUrl + master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + masterAddress = activeMasterUrl match { + case Master.sparkUrlRegex(_host, _port) => + Address("akka.tcp", Master.systemName, _host, _port.toInt) + case x => + throw new SparkException("Invalid spark URL: " + x) } + connected = true } def tryRegisterAllMasters() { @@ -199,9 +196,7 @@ private[spark] class Worker( } case SendHeartbeat => - masterLock.synchronized { - if (connected) { master ! Heartbeat(workerId) } - } + if (connected) { master ! Heartbeat(workerId) } case WorkDirCleanup => // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor @@ -244,9 +239,7 @@ private[spark] class Worker( manager.start() coresUsed += cores_ memoryUsed += memory_ - masterLock.synchronized { - master ! ExecutorStateChanged(appId, execId, manager.state, None, None) - } + master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } catch { case e: Exception => { logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) @@ -254,17 +247,13 @@ private[spark] class Worker( executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } - masterLock.synchronized { - master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) - } + master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) } } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => - masterLock.synchronized { - master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) - } + master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) val fullId = appId + "/" + execId if (ExecutorState.isFinished(state)) { executors.get(fullId) match { @@ -330,9 +319,7 @@ private[spark] class Worker( case _ => logDebug(s"Driver $driverId changed state to $state") } - masterLock.synchronized { - master ! DriverStateChanged(driverId, state, exception) - } + master ! DriverStateChanged(driverId, state, exception) val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver memoryUsed -= driver.driverDesc.mem |