aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-11 15:30:21 -0800
committerReynold Xin <rxin@databricks.com>2015-11-11 15:30:21 -0800
commite1bcf6af9ba4f131f84d71660d0ab5598c0b7b67 (patch)
tree87f0c0aa221b32068f727cc19aa23b785714aa65
parent2d76e44b1a88e08047806972b2d241a89e499bab (diff)
downloadspark-e1bcf6af9ba4f131f84d71660d0ab5598c0b7b67.tar.gz
spark-e1bcf6af9ba4f131f84d71660d0ab5598c0b7b67.tar.bz2
spark-e1bcf6af9ba4f131f84d71660d0ab5598c0b7b67.zip
[SPARK-10827] replace volatile with Atomic* in AppClient.scala.
This is a followup for #9317 to replace volatile fields with AtomicBoolean and AtomicReference. Author: Reynold Xin <rxin@databricks.com> Closes #9611 from rxin/SPARK-10827.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala68
1 files changed, 35 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 3f29da663b..afab362e21 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.client
import java.util.concurrent._
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import scala.util.control.NonFatal
@@ -49,9 +50,9 @@ private[spark] class AppClient(
private val REGISTRATION_TIMEOUT_SECONDS = 20
private val REGISTRATION_RETRIES = 3
- @volatile private var endpoint: RpcEndpointRef = null
- @volatile private var appId: String = null
- @volatile private var registered = false
+ private val endpoint = new AtomicReference[RpcEndpointRef]
+ private val appId = new AtomicReference[String]
+ private val registered = new AtomicBoolean(false)
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
with Logging {
@@ -59,16 +60,17 @@ private[spark] class AppClient(
private var master: Option[RpcEndpointRef] = None
// To avoid calling listener.disconnected() multiple times
private var alreadyDisconnected = false
- @volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times
- @volatile private var registerMasterFutures: Array[JFuture[_]] = null
- @volatile private var registrationRetryTimer: JScheduledFuture[_] = null
+ // To avoid calling listener.dead() multiple times
+ private val alreadyDead = new AtomicBoolean(false)
+ private val registerMasterFutures = new AtomicReference[Array[JFuture[_]]]
+ private val registrationRetryTimer = new AtomicReference[JScheduledFuture[_]]
// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
private val registerMasterThreadPool = new ThreadPoolExecutor(
0,
- masterRpcAddresses.size, // Make sure we can register with all masters at the same time
+ masterRpcAddresses.length, // Make sure we can register with all masters at the same time
60L, TimeUnit.SECONDS,
new SynchronousQueue[Runnable](),
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
@@ -100,7 +102,7 @@ private[spark] class AppClient(
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
- if (registered) {
+ if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
@@ -123,22 +125,22 @@ private[spark] class AppClient(
* nthRetry means this is the nth attempt to register with master.
*/
private def registerWithMaster(nthRetry: Int) {
- registerMasterFutures = tryRegisterAllMasters()
- registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
+ registerMasterFutures.set(tryRegisterAllMasters())
+ registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
Utils.tryOrExit {
- if (registered) {
- registerMasterFutures.foreach(_.cancel(true))
+ if (registered.get) {
+ registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
- registerMasterFutures.foreach(_.cancel(true))
+ registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}
- }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
/**
@@ -163,10 +165,10 @@ private[spark] class AppClient(
// RegisteredApplications due to an unstable network.
// 2. Receive multiple RegisteredApplication from different masters because the master is
// changing.
- appId = appId_
- registered = true
+ appId.set(appId_)
+ registered.set(true)
master = Some(masterRef)
- listener.connected(appId)
+ listener.connected(appId.get)
case ApplicationRemoved(message) =>
markDead("Master removed our application: %s".format(message))
@@ -178,7 +180,7 @@ private[spark] class AppClient(
cores))
// FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not
// guaranteed), `ExecutorStateChanged` may be sent to a dead master.
- sendToMaster(ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None))
+ sendToMaster(ExecutorStateChanged(appId.get, id, ExecutorState.RUNNING, None, None))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
case ExecutorUpdated(id, state, message, exitStatus) =>
@@ -193,13 +195,13 @@ private[spark] class AppClient(
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
master = Some(masterRef)
alreadyDisconnected = false
- masterRef.send(MasterChangeAcknowledged(appId))
+ masterRef.send(MasterChangeAcknowledged(appId.get))
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case StopAppClient =>
markDead("Application has been stopped.")
- sendToMaster(UnregisterApplication(appId))
+ sendToMaster(UnregisterApplication(appId.get))
context.reply(true)
stop()
@@ -263,18 +265,18 @@ private[spark] class AppClient(
}
def markDead(reason: String) {
- if (!alreadyDead) {
+ if (!alreadyDead.get) {
listener.dead(reason)
- alreadyDead = true
+ alreadyDead.set(true)
}
}
override def onStop(): Unit = {
- if (registrationRetryTimer != null) {
- registrationRetryTimer.cancel(true)
+ if (registrationRetryTimer.get != null) {
+ registrationRetryTimer.get.cancel(true)
}
registrationRetryThread.shutdownNow()
- registerMasterFutures.foreach(_.cancel(true))
+ registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
askAndReplyThreadPool.shutdownNow()
}
@@ -283,19 +285,19 @@ private[spark] class AppClient(
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
- endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
+ endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
def stop() {
- if (endpoint != null) {
+ if (endpoint.get != null) {
try {
val timeout = RpcUtils.askRpcTimeout(conf)
- timeout.awaitResult(endpoint.ask[Boolean](StopAppClient))
+ timeout.awaitResult(endpoint.get.ask[Boolean](StopAppClient))
} catch {
case e: TimeoutException =>
logInfo("Stop request to Master timed out; it may already be shut down.")
}
- endpoint = null
+ endpoint.set(null)
}
}
@@ -306,8 +308,8 @@ private[spark] class AppClient(
* @return whether the request is acknowledged.
*/
def requestTotalExecutors(requestedTotal: Int): Boolean = {
- if (endpoint != null && appId != null) {
- endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal))
+ if (endpoint.get != null && appId.get != null) {
+ endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
false
@@ -319,8 +321,8 @@ private[spark] class AppClient(
* @return whether the kill request is acknowledged.
*/
def killExecutors(executorIds: Seq[String]): Boolean = {
- if (endpoint != null && appId != null) {
- endpoint.askWithRetry[Boolean](KillExecutors(appId, executorIds))
+ if (endpoint.get != null && appId.get != null) {
+ endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds))
} else {
logWarning("Attempted to kill executors before driver fully initialized.")
false