From 9d668b73687e697cad2ef7fd3c3ba405e9795593 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 4 Aug 2015 14:54:11 -0700 Subject: [SPARK-9602] remove "Akka/Actor" words from comments https://issues.apache.org/jira/browse/SPARK-9602 Although we have hidden Akka behind RPC interface, I found that the Akka/Actor-related comments are still spreading everywhere. To make it consistent, we shall remove "actor"/"akka" words from the comments... Author: CodingCat Closes #7936 from CodingCat/SPARK-9602 and squashes the following commits: e8296a3 [CodingCat] remove actor words from comments --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 +- .../src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala | 4 ---- core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 2 +- .../scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala | 6 +++--- .../main/scala/org/apache/spark/deploy/master/MasterMessages.scala | 2 +- .../apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala | 6 +++--- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 7 ++++--- .../main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala | 4 ++-- core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala | 2 +- .../scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala | 2 +- .../scala/org/apache/spark/scheduler/cluster/ExecutorData.scala | 2 +- core/src/main/scala/org/apache/spark/util/IdGenerator.scala | 6 +++--- .../org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala | 4 ++-- .../scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala | 5 ++--- project/MimaExcludes.scala | 2 +- .../src/main/scala/org/apache/spark/repl/SparkILoop.scala | 2 +- 16 files changed, 27 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 55e563ee96..2a56bf28d7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -794,7 +794,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: /** * We try to reuse a single Socket to transfer accumulator updates, as they are all added - * by the DAGScheduler's single-threaded actor anyway. + * by the DAGScheduler's single-threaded RpcEndpoint anyway. */ @transient var socket: Socket = _ diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 53356addf6..83ccaadfe7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -73,12 +73,8 @@ class LocalSparkCluster( def stop() { logInfo("Shutting down local Spark cluster.") // Stop the workers before the master so they don't get upset that it disconnected - // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors! - // This is unfortunate, but for now we just comment it out. workerRpcEnvs.foreach(_.shutdown()) - // workerActorSystems.foreach(_.awaitTermination()) masterRpcEnvs.foreach(_.shutdown()) - // masterActorSystems.foreach(_.awaitTermination()) masterRpcEnvs.clear() workerRpcEnvs.clear() } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 7576a2985e..25ea692543 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -257,7 +257,7 @@ private[spark] class AppClient( } def start() { - // Just launch an actor; it will call back into the listener. + // Just launch an rpcEndpoint; it will call back into the listener. endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala index cf77c86d76..70f21fbe0d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -26,7 +26,7 @@ import org.apache.spark.annotation.DeveloperApi */ @DeveloperApi trait LeaderElectionAgent { - val masterActor: LeaderElectable + val masterInstance: LeaderElectable def stop() {} // to avoid noops in implementations. } @@ -37,7 +37,7 @@ trait LeaderElectable { } /** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */ -private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable) +private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable) extends LeaderElectionAgent { - masterActor.electedLeader() + masterInstance.electedLeader() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala index 68c937188b..a952cee36e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -38,5 +38,5 @@ private[master] object MasterMessages { case object BoundPortsRequest - case class BoundPortsResponse(actorPort: Int, webUIPort: Int, restPort: Option[Int]) + case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int]) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index 6fdff86f66..d317206a61 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} import org.apache.spark.deploy.SparkCuratorUtil -private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable, +private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable, conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" @@ -73,10 +73,10 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElecta private def updateLeadershipStatus(isLeader: Boolean) { if (isLeader && status == LeadershipStatus.NOT_LEADER) { status = LeadershipStatus.LEADER - masterActor.electedLeader() + masterInstance.electedLeader() } else if (!isLeader && status == LeadershipStatus.LEADER) { status = LeadershipStatus.NOT_LEADER - masterActor.revokedLeadership() + masterInstance.revokedLeadership() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index c82a7ccab5..6792d3310b 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -228,7 +228,7 @@ private[deploy] class Worker( /** * Re-register with the master because a network failure or a master failure has occurred. * If the re-registration attempt threshold is exceeded, the worker exits with error. - * Note that for thread-safety this should only be called from the actor. + * Note that for thread-safety this should only be called from the rpcEndpoint. */ private def reregisterWithMaster(): Unit = { Utils.tryOrExit { @@ -365,7 +365,8 @@ private[deploy] class Worker( if (connected) { sendToMaster(Heartbeat(workerId, self)) } case WorkDirCleanup => - // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor + // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker + // rpcEndpoint. // Copy ids so that it can be used in the cleanup thread. val appIds = executors.values.map(_.appId).toSet val cleanupFuture = concurrent.future { @@ -684,7 +685,7 @@ private[deploy] object Worker extends Logging { workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): RpcEnv = { - // The LocalSparkCluster runs multiple local sparkWorkerX actor systems + // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala index fae5640b9a..735c4f0927 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -43,7 +43,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin private[deploy] def setTesting(testing: Boolean) = isTesting = testing private var isTesting = false - // Lets us filter events only from the worker's actor system + // Lets filter events only from the worker's rpc system private val expectedAddress = RpcAddress.fromURIString(workerUrl) private def isWorker(address: RpcAddress) = expectedAddress == address @@ -62,7 +62,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (isWorker(remoteAddress)) { // This log message will never be seen - logError(s"Lost connection to worker actor $workerUrl. Exiting.") + logError(s"Lost connection to worker rpc endpoint $workerUrl. Exiting.") exitNonZero() } } diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index 6ae4789459..7409ac8859 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -100,7 +100,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf) val future = ask[T](message, timeout) val result = timeout.awaitResult(future) if (result == null) { - throw new SparkException("Actor returned null") + throw new SparkException("RpcEndpoint returned null") } return result } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 8321037cdc..5d926377ce 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -162,7 +162,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private[spark] object OutputCommitCoordinator { - // This actor is used only for RPC + // This endpoint is used only for RPC private[spark] class OutputCommitCoordinatorEndpoint( override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator) extends RpcEndpoint with Logging { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index 26e72c0bff..626a2b7d69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -22,7 +22,7 @@ import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress} /** * Grouping of data for an executor used by CoarseGrainedSchedulerBackend. * - * @param executorEndpoint The ActorRef representing this executor + * @param executorEndpoint The RpcEndpointRef representing this executor * @param executorAddress The network address of this executor * @param executorHost The hostname that this executor is running on * @param freeCores The current number of cores available for work on the executor diff --git a/core/src/main/scala/org/apache/spark/util/IdGenerator.scala b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala index 17e55f7996..53934ad4ce 100644 --- a/core/src/main/scala/org/apache/spark/util/IdGenerator.scala +++ b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala @@ -22,10 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger /** * A util used to get a unique generation ID. This is a wrapper around Java's * AtomicInteger. An example usage is in BlockManager, where each BlockManager - * instance would start an Akka actor and we use this utility to assign the Akka - * actors unique names. + * instance would start an RpcEndpoint and we use this utility to assign the RpcEndpoints' + * unique names. */ private[spark] class IdGenerator { - private var id = new AtomicInteger + private val id = new AtomicInteger def next: Int = id.incrementAndGet } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala index 8c96b0e71d..4b86da5367 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala @@ -99,7 +99,7 @@ object CustomPersistenceEngine { @volatile var lastInstance: Option[CustomPersistenceEngine] = None } -class CustomLeaderElectionAgent(val masterActor: LeaderElectable) extends LeaderElectionAgent { - masterActor.electedLeader() +class CustomLeaderElectionAgent(val masterInstance: LeaderElectable) extends LeaderElectionAgent { + masterInstance.electedLeader() } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala index cd24d79423..e9034e39a7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala @@ -38,12 +38,11 @@ class WorkerWatcherSuite extends SparkFunSuite { val conf = new SparkConf() val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) val targetWorkerUrl = rpcEnv.uriOf("test", RpcAddress("1.2.3.4", 1234), "Worker") - val otherAddress = "akka://test@4.3.2.1:1234/user/OtherActor" - val otherAkkaAddress = RpcAddress("4.3.2.1", 1234) + val otherRpcAddress = RpcAddress("4.3.2.1", 1234) val workerWatcher = new WorkerWatcher(rpcEnv, targetWorkerUrl) workerWatcher.setTesting(testing = true) rpcEnv.setupEndpoint("worker-watcher", workerWatcher) - workerWatcher.onDisconnected(otherAkkaAddress) + workerWatcher.onDisconnected(otherRpcAddress) assert(!workerWatcher.isShutDown) rpcEnv.shutdown() } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 280aac9319..b60ae784c3 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -182,7 +182,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]( "org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast"), ProblemFilters.exclude[MissingClassProblem]( - "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor") + "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint") ) ++ Seq( // SPARK-4655 - Making Stage an Abstract class broke binary compatility even though // the stage class is defined as private[spark] diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 8130868fe1..304b1e8cdb 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -981,7 +981,7 @@ class SparkILoop( // which spins off a separate thread, then print the prompt and try // our best to look ready. The interlocking lazy vals tend to // inter-deadlock, so we break the cycle with a single asynchronous - // message to an actor. + // message to an rpcEndpoint. if (isAsync) { intp initialize initializedCallback() createAsyncListener() // listens for signal to run postInitialization -- cgit v1.2.3