aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2015-08-04 14:54:11 -0700
committerReynold Xin <rxin@databricks.com>2015-08-04 14:54:11 -0700
commit9d668b73687e697cad2ef7fd3c3ba405e9795593 (patch)
treee7a4ddbd92ecab1fc5388e37273f25bec3b1737b
parentab8ee1a3b93286a62949569615086ef5030e9fae (diff)
downloadspark-9d668b73687e697cad2ef7fd3c3ba405e9795593.tar.gz
spark-9d668b73687e697cad2ef7fd3c3ba405e9795593.tar.bz2
spark-9d668b73687e697cad2ef7fd3c3ba405e9795593.zip
[SPARK-9602] remove "Akka/Actor" words from comments
https://issues.apache.org/jira/browse/SPARK-9602 Although we have hidden Akka behind RPC interface, I found that the Akka/Actor-related comments are still spreading everywhere. To make it consistent, we shall remove "actor"/"akka" words from the comments... Author: CodingCat <zhunansjtu@gmail.com> Closes #7936 from CodingCat/SPARK-9602 and squashes the following commits: e8296a3 [CodingCat] remove actor words from comments
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala6
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/IdGenerator.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala5
-rw-r--r--project/MimaExcludes.scala2
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala2
16 files changed, 27 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 55e563ee96..2a56bf28d7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -794,7 +794,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
/**
* We try to reuse a single Socket to transfer accumulator updates, as they are all added
- * by the DAGScheduler's single-threaded actor anyway.
+ * by the DAGScheduler's single-threaded RpcEndpoint anyway.
*/
@transient var socket: Socket = _
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 53356addf6..83ccaadfe7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -73,12 +73,8 @@ class LocalSparkCluster(
def stop() {
logInfo("Shutting down local Spark cluster.")
// Stop the workers before the master so they don't get upset that it disconnected
- // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
- // This is unfortunate, but for now we just comment it out.
workerRpcEnvs.foreach(_.shutdown())
- // workerActorSystems.foreach(_.awaitTermination())
masterRpcEnvs.foreach(_.shutdown())
- // masterActorSystems.foreach(_.awaitTermination())
masterRpcEnvs.clear()
workerRpcEnvs.clear()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 7576a2985e..25ea692543 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -257,7 +257,7 @@ private[spark] class AppClient(
}
def start() {
- // Just launch an actor; it will call back into the listener.
+ // Just launch an rpcEndpoint; it will call back into the listener.
endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index cf77c86d76..70f21fbe0d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.DeveloperApi
*/
@DeveloperApi
trait LeaderElectionAgent {
- val masterActor: LeaderElectable
+ val masterInstance: LeaderElectable
def stop() {} // to avoid noops in implementations.
}
@@ -37,7 +37,7 @@ trait LeaderElectable {
}
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
-private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable)
+private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable)
extends LeaderElectionAgent {
- masterActor.electedLeader()
+ masterInstance.electedLeader()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index 68c937188b..a952cee36e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -38,5 +38,5 @@ private[master] object MasterMessages {
case object BoundPortsRequest
- case class BoundPortsResponse(actorPort: Int, webUIPort: Int, restPort: Option[Int])
+ case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 6fdff86f66..d317206a61 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
import org.apache.spark.deploy.SparkCuratorUtil
-private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
+private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
@@ -73,10 +73,10 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElecta
private def updateLeadershipStatus(isLeader: Boolean) {
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
- masterActor.electedLeader()
+ masterInstance.electedLeader()
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
- masterActor.revokedLeadership()
+ masterInstance.revokedLeadership()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c82a7ccab5..6792d3310b 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -228,7 +228,7 @@ private[deploy] class Worker(
/**
* Re-register with the master because a network failure or a master failure has occurred.
* If the re-registration attempt threshold is exceeded, the worker exits with error.
- * Note that for thread-safety this should only be called from the actor.
+ * Note that for thread-safety this should only be called from the rpcEndpoint.
*/
private def reregisterWithMaster(): Unit = {
Utils.tryOrExit {
@@ -365,7 +365,8 @@ private[deploy] class Worker(
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
case WorkDirCleanup =>
- // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
+ // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker
+ // rpcEndpoint.
// Copy ids so that it can be used in the cleanup thread.
val appIds = executors.values.map(_.appId).toSet
val cleanupFuture = concurrent.future {
@@ -684,7 +685,7 @@ private[deploy] object Worker extends Logging {
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
- // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
+ // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index fae5640b9a..735c4f0927 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -43,7 +43,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin
private[deploy] def setTesting(testing: Boolean) = isTesting = testing
private var isTesting = false
- // Lets us filter events only from the worker's actor system
+ // Lets filter events only from the worker's rpc system
private val expectedAddress = RpcAddress.fromURIString(workerUrl)
private def isWorker(address: RpcAddress) = expectedAddress == address
@@ -62,7 +62,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (isWorker(remoteAddress)) {
// This log message will never be seen
- logError(s"Lost connection to worker actor $workerUrl. Exiting.")
+ logError(s"Lost connection to worker rpc endpoint $workerUrl. Exiting.")
exitNonZero()
}
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 6ae4789459..7409ac8859 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -100,7 +100,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
val future = ask[T](message, timeout)
val result = timeout.awaitResult(future)
if (result == null) {
- throw new SparkException("Actor returned null")
+ throw new SparkException("RpcEndpoint returned null")
}
return result
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 8321037cdc..5d926377ce 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -162,7 +162,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[spark] object OutputCommitCoordinator {
- // This actor is used only for RPC
+ // This endpoint is used only for RPC
private[spark] class OutputCommitCoordinatorEndpoint(
override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
extends RpcEndpoint with Logging {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index 26e72c0bff..626a2b7d69 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -22,7 +22,7 @@ import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress}
/**
* Grouping of data for an executor used by CoarseGrainedSchedulerBackend.
*
- * @param executorEndpoint The ActorRef representing this executor
+ * @param executorEndpoint The RpcEndpointRef representing this executor
* @param executorAddress The network address of this executor
* @param executorHost The hostname that this executor is running on
* @param freeCores The current number of cores available for work on the executor
diff --git a/core/src/main/scala/org/apache/spark/util/IdGenerator.scala b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
index 17e55f7996..53934ad4ce 100644
--- a/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala
@@ -22,10 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger
/**
* A util used to get a unique generation ID. This is a wrapper around Java's
* AtomicInteger. An example usage is in BlockManager, where each BlockManager
- * instance would start an Akka actor and we use this utility to assign the Akka
- * actors unique names.
+ * instance would start an RpcEndpoint and we use this utility to assign the RpcEndpoints'
+ * unique names.
*/
private[spark] class IdGenerator {
- private var id = new AtomicInteger
+ private val id = new AtomicInteger
def next: Int = id.incrementAndGet
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
index 8c96b0e71d..4b86da5367 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala
@@ -99,7 +99,7 @@ object CustomPersistenceEngine {
@volatile var lastInstance: Option[CustomPersistenceEngine] = None
}
-class CustomLeaderElectionAgent(val masterActor: LeaderElectable) extends LeaderElectionAgent {
- masterActor.electedLeader()
+class CustomLeaderElectionAgent(val masterInstance: LeaderElectable) extends LeaderElectionAgent {
+ masterInstance.electedLeader()
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
index cd24d79423..e9034e39a7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
@@ -38,12 +38,11 @@ class WorkerWatcherSuite extends SparkFunSuite {
val conf = new SparkConf()
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val targetWorkerUrl = rpcEnv.uriOf("test", RpcAddress("1.2.3.4", 1234), "Worker")
- val otherAddress = "akka://test@4.3.2.1:1234/user/OtherActor"
- val otherAkkaAddress = RpcAddress("4.3.2.1", 1234)
+ val otherRpcAddress = RpcAddress("4.3.2.1", 1234)
val workerWatcher = new WorkerWatcher(rpcEnv, targetWorkerUrl)
workerWatcher.setTesting(testing = true)
rpcEnv.setupEndpoint("worker-watcher", workerWatcher)
- workerWatcher.onDisconnected(otherAkkaAddress)
+ workerWatcher.onDisconnected(otherRpcAddress)
assert(!workerWatcher.isShutDown)
rpcEnv.shutdown()
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 280aac9319..b60ae784c3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -182,7 +182,7 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast"),
ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor")
+ "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint")
) ++ Seq(
// SPARK-4655 - Making Stage an Abstract class broke binary compatility even though
// the stage class is defined as private[spark]
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 8130868fe1..304b1e8cdb 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -981,7 +981,7 @@ class SparkILoop(
// which spins off a separate thread, then print the prompt and try
// our best to look ready. The interlocking lazy vals tend to
// inter-deadlock, so we break the cycle with a single asynchronous
- // message to an actor.
+ // message to an rpcEndpoint.
if (isAsync) {
intp initialize initializedCallback()
createAsyncListener() // listens for signal to run postInitialization