/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: String, executorId: String, hostname: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need // to be changed so that we don't share the serializer instance across threads private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e) }(ThreadUtils.sameThread) } def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) } override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } case RegisterExecutorFailed(message) => exitExecutor(1, "Slave registration failed: " + message) case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) } case KillTask(taskId, _, interruptThread) => if (executor == null) { exitExecutor(1, "Received KillTask command but executor was null") } else { executor.killTask(taskId, interruptThread) } case StopExecutor => stopping.set(true) logInfo("Driver commanded a shutdown") // Cannot shutdown here because an ack may need to be sent back to the caller. So send // a message to self to actually do the shutdown. self.send(Shutdown) case Shutdown => stopping.set(true) new Thread("CoarseGrainedExecutorBackend-stop-executor") { override def run(): Unit = { // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally. // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180). // Therefore, we put this line in a new thread. executor.stop() } }.start() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (stopping.get()) { logInfo(s"Driver from $remoteAddress disconnected during shutdown") } else if (driver.exists(_.address == remoteAddress)) { exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.") } else { logWarning(s"An unknown ($remoteAddress) driver disconnected.") } } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } } /** * This function can be overloaded by other child classes to handle * executor exits differently. For e.g. when an executor goes down, * back-end may not want to take the parent process down. */ protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = { if (throwable != null) { logError(reason, throwable) } else { logError(reason) } System.exit(code) } } private[spark] object CoarseGrainedExecutorBackend extends Logging { private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { Utils.initDaemon(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf val port = executorConf.getInt("spark.executor.port", 0) val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf), clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. val driverConf = new SparkConf() for ((key, value) <- props) { // this is required for SSL in standalone mode if (SparkConf.isExecutorStartupConf(key)) { driverConf.setIfMissing(key, value) } else { driverConf.set(key, value) } } if (driverConf.contains("spark.yarn.credentials.file")) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) SparkHadoopUtil.get.startCredentialUpdater(driverConf) } val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } env.rpcEnv.awaitTermination() SparkHadoopUtil.get.stopCredentialUpdater() } } def main(args: Array[String]) { var driverUrl: String = null var executorId: String = null var hostname: String = null var cores: Int = 0 var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() var argv = args.toList while (!argv.isEmpty) { argv match { case ("--driver-url") :: value :: tail => driverUrl = value argv = tail case ("--executor-id") :: value :: tail => executorId = value argv = tail case ("--hostname") :: value :: tail => hostname = value argv = tail case ("--cores") :: value :: tail => cores = value.toInt argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail case ("--worker-url") :: value :: tail => // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail case Nil => case tail => // scalastyle:off println System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") // scalastyle:on println printUsageAndExit() } } if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || appId == null) { printUsageAndExit() } run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) System.exit(0) } private def printUsageAndExit() = { // scalastyle:off println System.err.println( """ |Usage: CoarseGrainedExecutorBackend [options] | | Options are: | --driver-url | --executor-id | --hostname | --cores | --app-id | --worker-url | --user-class-path |""".stripMargin) // scalastyle:on println System.exit(1) } }