diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala new file mode 100644 index 0000000000..ac6dc7d879 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.{Utils, Logging, SparkContext} +import org.apache.spark.deploy.client.{Client, ClientListener} +import org.apache.spark.deploy.{Command, ApplicationDescription} +import scala.collection.mutable.HashMap + +private[spark] class SparkDeploySchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + master: String, + appName: String) + extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + with ClientListener + with Logging { + + var client: Client = null + var stopping = false + var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ + + val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt + + override def start() { + super.start() + + // The endpoint for executors to talk to us + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), + StandaloneSchedulerBackend.ACTOR_NAME) + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") + val command = Command( + "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) + val sparkHome = sc.getSparkHome().getOrElse(null) + val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome, + sc.ui.appUIAddress) + + client = new Client(sc.env.actorSystem, master, appDesc, this) + client.start() + } + + override def stop() { + stopping = true + super.stop() + client.stop() + if (shutdownCallback != null) { + shutdownCallback(this) + } + } + + override def connected(appId: String) { + logInfo("Connected to Spark cluster with app ID " + appId) + } + + override def disconnected() { + if (!stopping) { + logError("Disconnected from Spark cluster!") + scheduler.error("Disconnected from Spark cluster") + } + } + + override def executorAdded(executorId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { + logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( + executorId, hostPort, cores, Utils.megabytesToString(memory))) + } + + override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) { + val reason: ExecutorLossReason = exitStatus match { + case Some(code) => ExecutorExited(code) + case None => SlaveLost(message) + } + logInfo("Executor %s removed: %s".format(executorId, message)) + removeExecutor(executorId, reason.toString) + } +} |