aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala34
1 files changed, 18 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index a06d853b46..16131215c8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -6,8 +6,9 @@ import akka.actor._
import scala.concurrent.duration._
import akka.pattern.ask
-import spark.{SparkException, Logging, TaskState}
+import spark.{Utils, SparkException, Logging, TaskState}
import scala.concurrent.Await
+
import java.util.concurrent.atomic.AtomicInteger
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
@@ -24,12 +25,12 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
var totalCoreCount = new AtomicInteger(0)
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
- val executorActor = new HashMap[String, ActorRef]
- val executorAddress = new HashMap[String, Address]
- val executorHost = new HashMap[String, String]
- val freeCores = new HashMap[String, Int]
- val actorToExecutorId = new HashMap[ActorRef, String]
- val addressToExecutorId = new HashMap[Address, String]
+ private val executorActor = new HashMap[String, ActorRef]
+ private val executorAddress = new HashMap[String, Address]
+ private val executorHostPort = new HashMap[String, String]
+ private val freeCores = new HashMap[String, Int]
+ private val actorToExecutorId = new HashMap[ActorRef, String]
+ private val addressToExecutorId = new HashMap[Address, String]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -37,7 +38,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
def receive = {
- case RegisterExecutor(executorId, host, cores) =>
+ case RegisterExecutor(executorId, hostPort, cores) =>
+ Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorActor.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
@@ -45,7 +47,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
sender ! RegisteredExecutor(sparkProperties)
context.watch(sender)
executorActor(executorId) = sender
- executorHost(executorId) = host
+ executorHostPort(executorId) = hostPort
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
actorToExecutorId(sender) = executorId
@@ -85,13 +87,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
- executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+ executorHostPort.toArray.map {case (id, hostPort) => new WorkerOffer(id, hostPort, freeCores(id))}))
}
// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
launchTasks(scheduler.resourceOffers(
- Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
+ Seq(new WorkerOffer(executorId, executorHostPort(executorId), freeCores(executorId)))))
}
// Launch tasks returned by a set of resource offers
@@ -110,9 +112,9 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
actorToExecutorId -= executorActor(executorId)
addressToExecutorId -= executorAddress(executorId)
executorActor -= executorId
- executorHost -= executorId
+ executorHostPort -= executorId
freeCores -= executorId
- executorHost -= executorId
+ executorHostPort -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
}
@@ -128,7 +130,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
while (iterator.hasNext) {
val entry = iterator.next
val (key, value) = (entry.getKey.toString, entry.getValue.toString)
- if (key.startsWith("spark.")) {
+ if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
properties += ((key, value))
}
}
@@ -136,10 +138,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
}
+ private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+
override def stop() {
try {
if (driverActor != null) {
- val timeout = 5.seconds
val future = driverActor.ask(StopDriver)(timeout)
Await.result(future, timeout)
}
@@ -159,7 +162,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: String) {
try {
- val timeout = 5.seconds
val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
Await.result(future, timeout)
} catch {