aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCrazyJvm <crazyjvm@gmail.com>2014-08-18 09:34:36 -0700
committerAaron Davidson <aaron@databricks.com>2014-08-18 09:34:36 -0700
commitc0cbbdeaf4f2033be03d32e3ea0288812b4edbf6 (patch)
tree7d6bbedf9c1d05bd34ec11d36c42342583184cdf /core
parent9306b8c6c8c412b9d0d5cffb6bd7a87784f0f6bf (diff)
downloadspark-c0cbbdeaf4f2033be03d32e3ea0288812b4edbf6.tar.gz
spark-c0cbbdeaf4f2033be03d32e3ea0288812b4edbf6.tar.bz2
spark-c0cbbdeaf4f2033be03d32e3ea0288812b4edbf6.zip
SPARK-3093 : masterLock in Worker is no longer need
there's no need to use masterLock in Worker now since all communications are within Akka actor Author: CrazyJvm <crazyjvm@gmail.com> Closes #2008 from CrazyJvm/no-need-master-lock and squashes the following commits: dd39e20 [CrazyJvm] fix format 58e7fa5 [CrazyJvm] there's no need to use masterLock now since all communications are within Akka actor
Diffstat (limited to 'core')
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala41
1 files changed, 14 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 80fde7e4b2..81400af22c 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -72,7 +72,6 @@ private[spark] class Worker(
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
val testing: Boolean = sys.props.contains("spark.testing")
- val masterLock: Object = new Object()
var master: ActorSelection = null
var masterAddress: Address = null
var activeMasterUrl: String = ""
@@ -145,18 +144,16 @@ private[spark] class Worker(
}
def changeMaster(url: String, uiUrl: String) {
- masterLock.synchronized {
- activeMasterUrl = url
- activeMasterWebUiUrl = uiUrl
- master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
- masterAddress = activeMasterUrl match {
- case Master.sparkUrlRegex(_host, _port) =>
- Address("akka.tcp", Master.systemName, _host, _port.toInt)
- case x =>
- throw new SparkException("Invalid spark URL: " + x)
- }
- connected = true
+ activeMasterUrl = url
+ activeMasterWebUiUrl = uiUrl
+ master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+ masterAddress = activeMasterUrl match {
+ case Master.sparkUrlRegex(_host, _port) =>
+ Address("akka.tcp", Master.systemName, _host, _port.toInt)
+ case x =>
+ throw new SparkException("Invalid spark URL: " + x)
}
+ connected = true
}
def tryRegisterAllMasters() {
@@ -199,9 +196,7 @@ private[spark] class Worker(
}
case SendHeartbeat =>
- masterLock.synchronized {
- if (connected) { master ! Heartbeat(workerId) }
- }
+ if (connected) { master ! Heartbeat(workerId) }
case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
@@ -244,9 +239,7 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
- masterLock.synchronized {
- master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
- }
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
@@ -254,17 +247,13 @@ private[spark] class Worker(
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
- masterLock.synchronized {
- master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
- }
+ master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
}
}
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
- masterLock.synchronized {
- master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
- }
+ master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
@@ -330,9 +319,7 @@ private[spark] class Worker(
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
- masterLock.synchronized {
- master ! DriverStateChanged(driverId, state, exception)
- }
+ master ! DriverStateChanged(driverId, state, exception)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
memoryUsed -= driver.driverDesc.mem