aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala91
1 files changed, 91 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
new file mode 100644
index 0000000000..ac6dc7d879
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.{Utils, Logging, SparkContext}
+import org.apache.spark.deploy.client.{Client, ClientListener}
+import org.apache.spark.deploy.{Command, ApplicationDescription}
+import scala.collection.mutable.HashMap
+
+private[spark] class SparkDeploySchedulerBackend(
+ scheduler: ClusterScheduler,
+ sc: SparkContext,
+ master: String,
+ appName: String)
+ extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+ with ClientListener
+ with Logging {
+
+ var client: Client = null
+ var stopping = false
+ var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
+
+ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+
+ override def start() {
+ super.start()
+
+ // The endpoint for executors to talk to us
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
+ StandaloneSchedulerBackend.ACTOR_NAME)
+ val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
+ val command = Command(
+ "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
+ val sparkHome = sc.getSparkHome().getOrElse(null)
+ val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
+ sc.ui.appUIAddress)
+
+ client = new Client(sc.env.actorSystem, master, appDesc, this)
+ client.start()
+ }
+
+ override def stop() {
+ stopping = true
+ super.stop()
+ client.stop()
+ if (shutdownCallback != null) {
+ shutdownCallback(this)
+ }
+ }
+
+ override def connected(appId: String) {
+ logInfo("Connected to Spark cluster with app ID " + appId)
+ }
+
+ override def disconnected() {
+ if (!stopping) {
+ logError("Disconnected from Spark cluster!")
+ scheduler.error("Disconnected from Spark cluster")
+ }
+ }
+
+ override def executorAdded(executorId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
+ logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
+ executorId, hostPort, cores, Utils.megabytesToString(memory)))
+ }
+
+ override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
+ val reason: ExecutorLossReason = exitStatus match {
+ case Some(code) => ExecutorExited(code)
+ case None => SlaveLost(message)
+ }
+ logInfo("Executor %s removed: %s".format(executorId, message))
+ removeExecutor(executorId, reason.toString)
+ }
+}