aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-27 23:17:20 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-27 23:17:20 -0800
commit909850729ec59b788645575fdc03df7cc51fe42b (patch)
tree4fecb98a71bc20032ae87b7774f24017b0d96312 /core
parent44b4a0f88fcb31727347b755ae8ec14d69571b52 (diff)
downloadspark-909850729ec59b788645575fdc03df7cc51fe42b.tar.gz
spark-909850729ec59b788645575fdc03df7cc51fe42b.tar.bz2
spark-909850729ec59b788645575fdc03df7cc51fe42b.zip
Rename more things from slave to executor
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala2
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala48
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala2
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala10
8 files changed, 50 insertions, 60 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index af3acfecb6..f5ff267d44 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -65,7 +65,7 @@ private[spark] class ExecutorRunner(
}
}
- /** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */
+ /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
def substituteVariables(argument: String): String = argument match {
case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => hostname
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 435ee5743e..50871802ea 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -8,10 +8,10 @@ import akka.actor.{ActorRef, Actor, Props}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
import akka.remote.RemoteClientLifeCycleEvent
import spark.scheduler.cluster._
-import spark.scheduler.cluster.RegisteredSlave
+import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
-import spark.scheduler.cluster.RegisterSlaveFailed
-import spark.scheduler.cluster.RegisterSlave
+import spark.scheduler.cluster.RegisterExecutorFailed
+import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
@@ -30,7 +30,7 @@ private[spark] class StandaloneExecutorBackend(
try {
logInfo("Connecting to master: " + masterUrl)
master = context.actorFor(masterUrl)
- master ! RegisterSlave(executorId, hostname, cores)
+ master ! RegisterExecutor(executorId, hostname, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
@@ -41,11 +41,11 @@ private[spark] class StandaloneExecutorBackend(
}
override def receive = {
- case RegisteredSlave(sparkProperties) =>
+ case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with master")
executor.initialize(executorId, hostname, sparkProperties)
- case RegisterSlaveFailed(message) =>
+ case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
diff --git a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
deleted file mode 100644
index 96ebaa4601..0000000000
--- a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
+++ /dev/null
@@ -1,4 +0,0 @@
-package spark.scheduler.cluster
-
-private[spark]
-class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index f0792c1b76..6dd3ae003d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -19,7 +19,6 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
- val executorIdToSlaveId = new HashMap[String, String]
// Memory used by each executor (in megabytes)
val executorMemory = {
@@ -47,7 +46,7 @@ private[spark] class SparkDeploySchedulerBackend(
}
override def stop() {
- stopping = true;
+ stopping = true
super.stop()
client.stop()
if (shutdownCallback != null) {
@@ -67,23 +66,16 @@ private[spark] class SparkDeploySchedulerBackend(
}
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
- executorIdToSlaveId += id -> workerId
logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
id, host, cores, Utils.memoryMegabytesToString(memory)))
}
- def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {
+ def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code)
case None => SlaveLost(message)
}
- logInfo("Executor %s removed: %s".format(id, message))
- executorIdToSlaveId.get(id) match {
- case Some(slaveId) =>
- executorIdToSlaveId.remove(id)
- scheduler.executorLost(slaveId, reason)
- case None =>
- logInfo("No slave ID known for executor %s".format(id))
- }
+ logInfo("Executor %s removed: %s".format(executorId, message))
+ scheduler.executorLost(executorId, reason)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index 1386cd9d44..c68f15bdfa 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -11,24 +11,26 @@ private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
private[spark]
-case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
+case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
+ extends StandaloneClusterMessage
private[spark]
-case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
+case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
-// Slaves to master
+// Executors to master
private[spark]
-case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
+case class RegisterExecutor(executorId: String, host: String, cores: Int)
+ extends StandaloneClusterMessage
private[spark]
-case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
+case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
extends StandaloneClusterMessage
private[spark]
object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
- def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
- StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data))
+ def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
+ StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 32be1e7a26..69822f568c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -24,9 +24,9 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
var totalCoreCount = new AtomicInteger(0)
class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor {
- val slaveActor = new HashMap[String, ActorRef]
- val slaveAddress = new HashMap[String, Address]
- val slaveHost = new HashMap[String, String]
+ val executorActor = new HashMap[String, ActorRef]
+ val executorAddress = new HashMap[String, Address]
+ val executorHost = new HashMap[String, String]
val freeCores = new HashMap[String, Int]
val actorToExecutorId = new HashMap[ActorRef, String]
val addressToExecutorId = new HashMap[Address, String]
@@ -37,17 +37,17 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
def receive = {
- case RegisterSlave(executorId, host, cores) =>
- if (slaveActor.contains(executorId)) {
- sender ! RegisterSlaveFailed("Duplicate executor ID: " + executorId)
+ case RegisterExecutor(executorId, host, cores) =>
+ if (executorActor.contains(executorId)) {
+ sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
- sender ! RegisteredSlave(sparkProperties)
+ sender ! RegisteredExecutor(sparkProperties)
context.watch(sender)
- slaveActor(executorId) = sender
- slaveHost(executorId) = host
+ executorActor(executorId) = sender
+ executorHost(executorId) = host
freeCores(executorId) = cores
- slaveAddress(executorId) = sender.path.address
+ executorAddress(executorId) = sender.path.address
actorToExecutorId(sender) = executorId
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
@@ -69,45 +69,45 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
context.stop(self)
case Terminated(actor) =>
- actorToExecutorId.get(actor).foreach(removeSlave(_, "Akka actor terminated"))
+ actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
case RemoteClientDisconnected(transport, address) =>
- addressToExecutorId.get(address).foreach(removeSlave(_, "remote Akka client disconnected"))
+ addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
case RemoteClientShutdown(transport, address) =>
- addressToExecutorId.get(address).foreach(removeSlave(_, "remote Akka client shutdown"))
+ addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
}
- // Make fake resource offers on all slaves
+ // Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
- slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+ executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
- // Make fake resource offers on just one slave
+ // Make fake resource offers on just one executor
def makeOffers(executorId: String) {
launchTasks(scheduler.resourceOffers(
- Seq(new WorkerOffer(executorId, slaveHost(executorId), freeCores(executorId)))))
+ Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
}
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
- slaveActor(task.executorId) ! LaunchTask(task)
+ executorActor(task.executorId) ! LaunchTask(task)
}
}
// Remove a disconnected slave from the cluster
- def removeSlave(executorId: String, reason: String) {
+ def removeExecutor(executorId: String, reason: String) {
logInfo("Slave " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId)
- actorToExecutorId -= slaveActor(executorId)
- addressToExecutorId -= slaveAddress(executorId)
- slaveActor -= executorId
- slaveHost -= executorId
+ actorToExecutorId -= executorActor(executorId)
+ addressToExecutorId -= executorAddress(executorId)
+ executorActor -= executorId
+ executorHost -= executorId
freeCores -= executorId
- slaveHost -= executorId
+ executorHost -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index b7423c7234..956ede201e 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -21,6 +21,8 @@ object BlockManagerUI extends Logging {
def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) {
val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc)
try {
+ // TODO: This needs to find a random free port to bind to. Unfortunately, there's no way
+ // in spray to do that, so we'll have to rely on something like new ServerSocket()
val boundPort = AkkaUtils.startSprayServer(actorSystem, "0.0.0.0",
Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt,
webUIDirectives.handler, "BlockManagerHTTPServer")
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index 139e21d09e..721c4c6029 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -14,18 +14,16 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
val task = new TimerTask {
def run() {
try {
- if (delaySeconds > 0) {
- cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
- logInfo("Ran metadata cleaner for " + name)
- }
+ cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
+ logInfo("Ran metadata cleaner for " + name)
} catch {
case e: Exception => logError("Error running cleanup task for " + name, e)
}
}
}
- if (periodSeconds > 0) {
- logInfo(
+ if (delaySeconds > 0) {
+ logDebug(
"Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ "period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)