diff options
Diffstat (limited to 'core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala')
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala | 37 |
1 files changed, 20 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d606432572..004592a540 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -5,8 +5,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ import akka.util.duration._ import akka.pattern.ask +import akka.util.Duration -import spark.{SparkException, Logging, TaskState} +import spark.{Utils, SparkException, Logging, TaskState} import akka.dispatch.Await import java.util.concurrent.atomic.AtomicInteger import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} @@ -24,12 +25,12 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor var totalCoreCount = new AtomicInteger(0) class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { - val executorActor = new HashMap[String, ActorRef] - val executorAddress = new HashMap[String, Address] - val executorHost = new HashMap[String, String] - val freeCores = new HashMap[String, Int] - val actorToExecutorId = new HashMap[ActorRef, String] - val addressToExecutorId = new HashMap[Address, String] + private val executorActor = new HashMap[String, ActorRef] + private val executorAddress = new HashMap[String, Address] + private val executorHostPort = new HashMap[String, String] + private val freeCores = new HashMap[String, Int] + private val actorToExecutorId = new HashMap[ActorRef, String] + private val addressToExecutorId = new HashMap[Address, String] override def preStart() { // Listen for remote client disconnection events, since they don't go through Akka's watch() @@ -37,7 +38,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } def receive = { - case RegisterExecutor(executorId, host, cores) => + case RegisterExecutor(executorId, hostPort, cores) => + Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorActor.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { @@ -45,7 +47,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor sender ! RegisteredExecutor(sparkProperties) context.watch(sender) executorActor(executorId) = sender - executorHost(executorId) = host + executorHostPort(executorId) = hostPort freeCores(executorId) = cores executorAddress(executorId) = sender.path.address actorToExecutorId(sender) = executorId @@ -85,13 +87,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor // Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers( - executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) + executorHostPort.toArray.map {case (id, hostPort) => new WorkerOffer(id, hostPort, freeCores(id))})) } // Make fake resource offers on just one executor def makeOffers(executorId: String) { launchTasks(scheduler.resourceOffers( - Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) + Seq(new WorkerOffer(executorId, executorHostPort(executorId), freeCores(executorId))))) } // Launch tasks returned by a set of resource offers @@ -110,9 +112,9 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor actorToExecutorId -= executorActor(executorId) addressToExecutorId -= executorAddress(executorId) executorActor -= executorId - executorHost -= executorId + executorHostPort -= executorId freeCores -= executorId - executorHost -= executorId + executorHostPort -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } @@ -128,7 +130,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor while (iterator.hasNext) { val entry = iterator.next val (key, value) = (entry.getKey.toString, entry.getValue.toString) - if (key.startsWith("spark.")) { + if (key.startsWith("spark.") && !key.equals("spark.hostPort")) { properties += ((key, value)) } } @@ -136,10 +138,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME) } + private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + override def stop() { try { if (driverActor != null) { - val timeout = 5.seconds val future = driverActor.ask(StopDriver)(timeout) Await.result(future, timeout) } @@ -153,12 +156,12 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor driverActor ! ReviveOffers } - override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) + override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism")) + .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) // Called by subclasses when notified of a lost worker def removeExecutor(executorId: String, reason: String) { try { - val timeout = 5.seconds val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout) Await.result(future, timeout) } catch { |