diff options
author | Michael Gummelt <mgummelt@mesosphere.io> | 2016-08-26 12:25:22 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-08-26 12:25:22 -0700 |
commit | 8e5475be3c9a620f18f6712631b093464a7d0ee7 (patch) | |
tree | 417e25ea8798c0f9313285623a664fe7ac4fc003 /core/src/main | |
parent | c0949dc944b7e2fc8a4465acc68a8f2713b3fa13 (diff) | |
download | spark-8e5475be3c9a620f18f6712631b093464a7d0ee7.tar.gz spark-8e5475be3c9a620f18f6712631b093464a7d0ee7.tar.bz2 spark-8e5475be3c9a620f18f6712631b093464a7d0ee7.zip |
[SPARK-16967] move mesos to module
## What changes were proposed in this pull request?
Move Mesos code into a mvn module
## How was this patch tested?
unit tests
manually submitting a client mode and cluster mode job
spark/mesos integration test suite
Author: Michael Gummelt <mgummelt@mesosphere.io>
Closes #14637 from mgummelt/mesos-module.
Diffstat (limited to 'core/src/main')
19 files changed, 1 insertions, 3833 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2eaeab1d80..08d6343d62 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -42,7 +42,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} -import org.apache.mesos.MesosNativeLibrary import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast @@ -56,7 +55,6 @@ import org.apache.spark.rdd._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} -import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump @@ -2512,18 +2510,6 @@ object SparkContext extends Logging { } (backend, scheduler) - case MESOS_REGEX(mesosUrl) => - MesosNativeLibrary.load() - val scheduler = new TaskSchedulerImpl(sc) - val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true) - val backend = if (coarseGrained) { - new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager) - } else { - new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl) - } - scheduler.initialize(backend) - (backend, scheduler) - case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr @@ -2545,7 +2531,7 @@ object SparkContext extends Logging { private def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = - ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) + ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " + s"for the url $url:") @@ -2566,8 +2552,6 @@ private object SparkMasterRegex { val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters val SPARK_REGEX = """spark://(.*)""".r - // Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url - val MESOS_REGEX = """mesos://(.*)""".r } /** diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala index d232fae6b1..cbace7b5f9 100644 --- a/core/src/main/scala/org/apache/spark/TaskState.scala +++ b/core/src/main/scala/org/apache/spark/TaskState.scala @@ -17,8 +17,6 @@ package org.apache.spark -import org.apache.mesos.Protos.{TaskState => MesosTaskState} - private[spark] object TaskState extends Enumeration { val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value @@ -30,22 +28,4 @@ private[spark] object TaskState extends Enumeration { def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == state) def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains(state) - - def toMesos(state: TaskState): MesosTaskState = state match { - case LAUNCHING => MesosTaskState.TASK_STARTING - case RUNNING => MesosTaskState.TASK_RUNNING - case FINISHED => MesosTaskState.TASK_FINISHED - case FAILED => MesosTaskState.TASK_FAILED - case KILLED => MesosTaskState.TASK_KILLED - case LOST => MesosTaskState.TASK_LOST - } - - def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match { - case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING - case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING - case MesosTaskState.TASK_FINISHED => FINISHED - case MesosTaskState.TASK_FAILED => FAILED - case MesosTaskState.TASK_KILLED => KILLED - case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala deleted file mode 100644 index 73b6ca3844..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.util.concurrent.CountDownLatch - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.mesos.ui.MesosClusterUI -import org.apache.spark.deploy.rest.mesos.MesosRestServer -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.cluster.mesos._ -import org.apache.spark.util.{ShutdownHookManager, Utils} - -/* - * A dispatcher that is responsible for managing and launching drivers, and is intended to be - * used for Mesos cluster mode. The dispatcher is a long-running process started by the user in - * the cluster independently of Spark applications. - * It contains a [[MesosRestServer]] that listens for requests to submit drivers and a - * [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master - * for resources. - * - * A typical new driver lifecycle is the following: - * - Driver submitted via spark-submit talking to the [[MesosRestServer]] - * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]] - * - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue - * - * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable - * per driver launched. - * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as - * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and - * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively. - */ -private[mesos] class MesosClusterDispatcher( - args: MesosClusterDispatcherArguments, - conf: SparkConf) - extends Logging { - - private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host) - private val recoveryMode = conf.get("spark.deploy.recoveryMode", "NONE").toUpperCase() - logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode) - - private val engineFactory = recoveryMode match { - case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory - case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf) - case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode) - } - - private val scheduler = new MesosClusterScheduler(engineFactory, conf) - - private val server = new MesosRestServer(args.host, args.port, conf, scheduler) - private val webUi = new MesosClusterUI( - new SecurityManager(conf), - args.webUiPort, - conf, - publicAddress, - scheduler) - - private val shutdownLatch = new CountDownLatch(1) - - def start(): Unit = { - webUi.bind() - scheduler.frameworkUrl = conf.get("spark.mesos.dispatcher.webui.url", webUi.activeWebUiUrl) - scheduler.start() - server.start() - } - - def awaitShutdown(): Unit = { - shutdownLatch.await() - } - - def stop(): Unit = { - webUi.stop() - server.stop() - scheduler.stop() - shutdownLatch.countDown() - } -} - -private[mesos] object MesosClusterDispatcher extends Logging { - def main(args: Array[String]) { - Utils.initDaemon(log) - val conf = new SparkConf - val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf) - conf.setMaster(dispatcherArgs.masterUrl) - conf.setAppName(dispatcherArgs.name) - dispatcherArgs.zookeeperUrl.foreach { z => - conf.set("spark.deploy.recoveryMode", "ZOOKEEPER") - conf.set("spark.deploy.zookeeper.url", z) - } - val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf) - dispatcher.start() - logDebug("Adding shutdown hook") // force eager creation of logger - ShutdownHookManager.addShutdownHook { () => - logInfo("Shutdown hook is shutting down dispatcher") - dispatcher.stop() - dispatcher.awaitShutdown() - } - dispatcher.awaitShutdown() - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala deleted file mode 100644 index 11e13441ee..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import scala.annotation.tailrec - -import org.apache.spark.SparkConf -import org.apache.spark.util.{IntParam, Utils} - - -private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) { - var host = Utils.localHostName() - var port = 7077 - var name = "Spark Cluster" - var webUiPort = 8081 - var masterUrl: String = _ - var zookeeperUrl: Option[String] = None - var propertiesFile: String = _ - - parse(args.toList) - - propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) - - @tailrec - private def parse(args: List[String]): Unit = args match { - case ("--host" | "-h") :: value :: tail => - Utils.checkHost(value, "Please use hostname " + value) - host = value - parse(tail) - - case ("--port" | "-p") :: IntParam(value) :: tail => - port = value - parse(tail) - - case ("--webui-port") :: IntParam(value) :: tail => - webUiPort = value - parse(tail) - - case ("--zk" | "-z") :: value :: tail => - zookeeperUrl = Some(value) - parse(tail) - - case ("--master" | "-m") :: value :: tail => - if (!value.startsWith("mesos://")) { - // scalastyle:off println - System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)") - // scalastyle:on println - System.exit(1) - } - masterUrl = value.stripPrefix("mesos://") - parse(tail) - - case ("--name") :: value :: tail => - name = value - parse(tail) - - case ("--properties-file") :: value :: tail => - propertiesFile = value - parse(tail) - - case ("--help") :: tail => - printUsageAndExit(0) - - case Nil => - if (masterUrl == null) { - // scalastyle:off println - System.err.println("--master is required") - // scalastyle:on println - printUsageAndExit(1) - } - - case _ => - printUsageAndExit(1) - } - - private def printUsageAndExit(exitCode: Int): Unit = { - // scalastyle:off println - System.err.println( - "Usage: MesosClusterDispatcher [options]\n" + - "\n" + - "Options:\n" + - " -h HOST, --host HOST Hostname to listen on\n" + - " -p PORT, --port PORT Port to listen on (default: 7077)\n" + - " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" + - " --name NAME Framework name to show in Mesos UI\n" + - " -m --master MASTER URI for connecting to Mesos master\n" + - " -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" + - " Zookeeper for persistence\n" + - " --properties-file FILE Path to a custom Spark properties file.\n" + - " Default is conf/spark-defaults.conf.") - // scalastyle:on println - System.exit(exitCode) - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala deleted file mode 100644 index d4c7022f00..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.util.Date - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.Command -import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState - -/** - * Describes a Spark driver that is submitted from the - * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by - * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. - * @param jarUrl URL to the application jar - * @param mem Amount of memory for the driver - * @param cores Number of cores for the driver - * @param supervise Supervise the driver for long running app - * @param command The command to launch the driver. - * @param schedulerProperties Extra properties to pass the Mesos scheduler - */ -private[spark] class MesosDriverDescription( - val name: String, - val jarUrl: String, - val mem: Int, - val cores: Double, - val supervise: Boolean, - val command: Command, - schedulerProperties: Map[String, String], - val submissionId: String, - val submissionDate: Date, - val retryState: Option[MesosClusterRetryState] = None) - extends Serializable { - - val conf = new SparkConf(false) - schedulerProperties.foreach {case (k, v) => conf.set(k, v)} - - def copy( - name: String = name, - jarUrl: String = jarUrl, - mem: Int = mem, - cores: Double = cores, - supervise: Boolean = supervise, - command: Command = command, - schedulerProperties: SparkConf = conf, - submissionId: String = submissionId, - submissionDate: Date = submissionDate, - retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = { - - new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap, - submissionId, submissionDate, retryState) - } - - override def toString: String = s"MesosDriverDescription (${command.mainClass})" -} diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala deleted file mode 100644 index 6b297c4600..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.nio.ByteBuffer -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.ExternalShuffleService -import org.apache.spark.internal.Logging -import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} -import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage -import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat} -import org.apache.spark.network.util.TransportConf -import org.apache.spark.util.ThreadUtils - -/** - * An RPC endpoint that receives registration requests from Spark drivers running on Mesos. - * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. - */ -private[mesos] class MesosExternalShuffleBlockHandler( - transportConf: TransportConf, - cleanerIntervalS: Long) - extends ExternalShuffleBlockHandler(transportConf, null) with Logging { - - ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher") - .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS) - - // Stores a map of app id to app state (timeout value and last heartbeat) - private val connectedApps = new ConcurrentHashMap[String, AppState]() - - protected override def handleMessage( - message: BlockTransferMessage, - client: TransportClient, - callback: RpcResponseCallback): Unit = { - message match { - case RegisterDriverParam(appId, appState) => - val address = client.getSocketAddress - val timeout = appState.heartbeatTimeout - logInfo(s"Received registration request from app $appId (remote address $address, " + - s"heartbeat timeout $timeout ms).") - if (connectedApps.containsKey(appId)) { - logWarning(s"Received a registration request from app $appId, but it was already " + - s"registered") - } - connectedApps.put(appId, appState) - callback.onSuccess(ByteBuffer.allocate(0)) - case Heartbeat(appId) => - val address = client.getSocketAddress - Option(connectedApps.get(appId)) match { - case Some(existingAppState) => - logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " + - s"address $address).") - existingAppState.lastHeartbeat = System.nanoTime() - case None => - logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " + - s"address $address, appId '$appId').") - } - case _ => super.handleMessage(message, client, callback) - } - } - - /** An extractor object for matching [[RegisterDriver]] message. */ - private object RegisterDriverParam { - def unapply(r: RegisterDriver): Option[(String, AppState)] = - Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) - } - - private object Heartbeat { - def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId) - } - - private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long) - - private class CleanerThread extends Runnable { - override def run(): Unit = { - val now = System.nanoTime() - connectedApps.asScala.foreach { case (appId, appState) => - if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { - logInfo(s"Application $appId timed out. Removing shuffle files.") - connectedApps.remove(appId) - applicationRemoved(appId, true) - } - } - } - } -} - -/** - * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers - * to associate with. This allows the shuffle service to detect when a driver is terminated - * and can clean up the associated shuffle files. - */ -private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager) - extends ExternalShuffleService(conf, securityManager) { - - protected override def newShuffleBlockHandler( - conf: TransportConf): ExternalShuffleBlockHandler = { - val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s") - new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS) - } -} - -private[spark] object MesosExternalShuffleService extends Logging { - - def main(args: Array[String]): Unit = { - ExternalShuffleService.main(args, - (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm)) - } -} - - diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala deleted file mode 100644 index cd98110ddc..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos.ui - -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.spark.deploy.Command -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, MesosClusterSubmissionState} -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") { - - override def render(request: HttpServletRequest): Seq[Node] = { - val driverId = request.getParameter("id") - require(driverId != null && driverId.nonEmpty, "Missing id parameter") - - val state = parent.scheduler.getDriverState(driverId) - if (state.isEmpty) { - val content = - <div> - <p>Cannot find driver {driverId}</p> - </div> - return UIUtils.basicSparkPage(content, s"Details for Job $driverId") - } - val driverState = state.get - val driverHeaders = Seq("Driver property", "Value") - val schedulerHeaders = Seq("Scheduler property", "Value") - val commandEnvHeaders = Seq("Command environment variable", "Value") - val launchedHeaders = Seq("Launched property", "Value") - val commandHeaders = Seq("Command property", "Value") - val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count") - val driverDescription = Iterable.apply(driverState.description) - val submissionState = Iterable.apply(driverState.submissionState) - val command = Iterable.apply(driverState.description.command) - val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap) - val commandEnv = Iterable.apply(driverState.description.command.environment) - val driverTable = - UIUtils.listingTable(driverHeaders, driverRow, driverDescription) - val commandTable = - UIUtils.listingTable(commandHeaders, commandRow, command) - val commandEnvTable = - UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv) - val schedulerTable = - UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties) - val launchedTable = - UIUtils.listingTable(launchedHeaders, launchedRow, submissionState) - val retryTable = - UIUtils.listingTable( - retryHeaders, retryRow, Iterable.apply(driverState.description.retryState)) - val content = - <p>Driver state information for driver id {driverId}</p> - <a href={UIUtils.prependBaseUri("/")}>Back to Drivers</a> - <div class="row-fluid"> - <div class="span12"> - <h4>Driver state: {driverState.state}</h4> - <h4>Driver properties</h4> - {driverTable} - <h4>Driver command</h4> - {commandTable} - <h4>Driver command environment</h4> - {commandEnvTable} - <h4>Scheduler properties</h4> - {schedulerTable} - <h4>Launched state</h4> - {launchedTable} - <h4>Retry state</h4> - {retryTable} - </div> - </div>; - - UIUtils.basicSparkPage(content, s"Details for Job $driverId") - } - - private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = { - submissionState.map { state => - <tr> - <td>Mesos Slave ID</td> - <td>{state.slaveId.getValue}</td> - </tr> - <tr> - <td>Mesos Task ID</td> - <td>{state.taskId.getValue}</td> - </tr> - <tr> - <td>Launch Time</td> - <td>{state.startDate}</td> - </tr> - <tr> - <td>Finish Time</td> - <td>{state.finishDate.map(_.toString).getOrElse("")}</td> - </tr> - <tr> - <td>Last Task Status</td> - <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td> - </tr> - }.getOrElse(Seq[Node]()) - } - - private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { - properties.map { case (k, v) => - <tr> - <td>{k}</td><td>{v}</td> - </tr> - }.toSeq - } - - private def commandRow(command: Command): Seq[Node] = { - <tr> - <td>Main class</td><td>{command.mainClass}</td> - </tr> - <tr> - <td>Arguments</td><td>{command.arguments.mkString(" ")}</td> - </tr> - <tr> - <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td> - </tr> - <tr> - <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td> - </tr> - <tr> - <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td> - </tr> - } - - private def driverRow(driver: MesosDriverDescription): Seq[Node] = { - <tr> - <td>Name</td><td>{driver.name}</td> - </tr> - <tr> - <td>Id</td><td>{driver.submissionId}</td> - </tr> - <tr> - <td>Cores</td><td>{driver.cores}</td> - </tr> - <tr> - <td>Memory</td><td>{driver.mem}</td> - </tr> - <tr> - <td>Submitted</td><td>{driver.submissionDate}</td> - </tr> - <tr> - <td>Supervise</td><td>{driver.supervise}</td> - </tr> - } - - private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = { - retryState.map { state => - <tr> - <td> - {state.lastFailureStatus} - </td> - <td> - {state.nextRetry} - </td> - <td> - {state.retries} - </td> - </tr> - }.getOrElse(Seq[Node]()) - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala deleted file mode 100644 index 8dcbdaad86..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos.ui - -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.mesos.Protos.TaskStatus - -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { - private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url") - - def render(request: HttpServletRequest): Seq[Node] = { - val state = parent.scheduler.getSchedulerState() - - val driverHeader = Seq("Driver ID") - val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil) - val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources") - - val queuedHeaders = driverHeader ++ submissionHeader - val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++ - Seq("Start Date", "Mesos Slave ID", "State") - val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++ - Seq("Last Failed Status", "Next Retry Time", "Attempt Count") - val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers) - val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers) - val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers) - val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers) - val content = - <p>Mesos Framework ID: {state.frameworkId}</p> - <div class="row-fluid"> - <div class="span12"> - <h4>Queued Drivers:</h4> - {queuedTable} - <h4>Launched Drivers:</h4> - {launchedTable} - <h4>Finished Drivers:</h4> - {finishedTable} - <h4>Supervise drivers waiting for retry:</h4> - {retryTable} - </div> - </div>; - UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster") - } - - private def queuedRow(submission: MesosDriverDescription): Seq[Node] = { - val id = submission.submissionId - <tr> - <td><a href={s"driver?id=$id"}>{id}</a></td> - <td>{submission.submissionDate}</td> - <td>{submission.command.mainClass}</td> - <td>cpus: {submission.cores}, mem: {submission.mem}</td> - </tr> - } - - private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = { - val id = state.driverDescription.submissionId - - val historyCol = if (historyServerURL.isDefined) { - <td> - <a href={s"${historyServerURL.get}/history/${state.frameworkId}"}> - {state.frameworkId} - </a> - </td> - } else Nil - - <tr> - <td><a href={s"driver?id=$id"}>{id}</a></td> - {historyCol} - <td>{state.driverDescription.submissionDate}</td> - <td>{state.driverDescription.command.mainClass}</td> - <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td> - <td>{state.startDate}</td> - <td>{state.slaveId.getValue}</td> - <td>{stateString(state.mesosTaskStatus)}</td> - </tr> - } - - private def retryRow(submission: MesosDriverDescription): Seq[Node] = { - val id = submission.submissionId - <tr> - <td><a href={s"driver?id=$id"}>{id}</a></td> - <td>{submission.submissionDate}</td> - <td>{submission.command.mainClass}</td> - <td>{submission.retryState.get.lastFailureStatus}</td> - <td>{submission.retryState.get.nextRetry}</td> - <td>{submission.retryState.get.retries}</td> - </tr> - } - - private def stateString(status: Option[TaskStatus]): String = { - if (status.isEmpty) { - return "" - } - val sb = new StringBuilder - val s = status.get - sb.append(s"State: ${s.getState}") - if (status.get.hasMessage) { - sb.append(s", Message: ${s.getMessage}") - } - if (status.get.hasHealthy) { - sb.append(s", Healthy: ${s.getHealthy}") - } - if (status.get.hasSource) { - sb.append(s", Source: ${s.getSource}") - } - if (status.get.hasReason) { - sb.append(s", Reason: ${s.getReason}") - } - if (status.get.hasTimestamp) { - sb.append(s", Time: ${s.getTimestamp}") - } - sb.toString() - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala deleted file mode 100644 index 604978967d..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos.ui - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler -import org.apache.spark.ui.{SparkUI, WebUI} -import org.apache.spark.ui.JettyUtils._ - -/** - * UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]] - */ -private[spark] class MesosClusterUI( - securityManager: SecurityManager, - port: Int, - val conf: SparkConf, - dispatcherPublicAddress: String, - val scheduler: MesosClusterScheduler) - extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) { - - initialize() - - def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort - - override def initialize() { - attachPage(new MesosClusterPage(this)) - attachPage(new DriverPage(this)) - attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static")) - } -} - -private object MesosClusterUI { - val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR -} diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala deleted file mode 100644 index 3b96488a12..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.rest.mesos - -import java.io.File -import java.text.SimpleDateFormat -import java.util.Date -import java.util.concurrent.atomic.AtomicLong -import javax.servlet.http.HttpServletResponse - -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} -import org.apache.spark.deploy.Command -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.deploy.rest._ -import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler -import org.apache.spark.util.Utils - -/** - * A server that responds to requests submitted by the [[RestSubmissionClient]]. - * All requests are forwarded to - * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. - * This is intended to be used in Mesos cluster mode only. - * For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs. - */ -private[spark] class MesosRestServer( - host: String, - requestedPort: Int, - masterConf: SparkConf, - scheduler: MesosClusterScheduler) - extends RestSubmissionServer(host, requestedPort, masterConf) { - - protected override val submitRequestServlet = - new MesosSubmitRequestServlet(scheduler, masterConf) - protected override val killRequestServlet = - new MesosKillRequestServlet(scheduler, masterConf) - protected override val statusRequestServlet = - new MesosStatusRequestServlet(scheduler, masterConf) -} - -private[mesos] class MesosSubmitRequestServlet( - scheduler: MesosClusterScheduler, - conf: SparkConf) - extends SubmitRequestServlet { - - private val DEFAULT_SUPERVISE = false - private val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // mb - private val DEFAULT_CORES = 1.0 - - private val nextDriverNumber = new AtomicLong(0) - private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs - private def newDriverId(submitDate: Date): String = { - "driver-%s-%04d".format( - createDateFormat.format(submitDate), nextDriverNumber.incrementAndGet()) - } - - /** - * Build a driver description from the fields specified in the submit request. - * - * This involves constructing a command that launches a mesos framework for the job. - * This does not currently consider fields used by python applications since python - * is not supported in mesos cluster mode yet. - */ - private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = { - // Required fields, including the main class because python is not yet supported - val appResource = Option(request.appResource).getOrElse { - throw new SubmitRestMissingFieldException("Application jar is missing.") - } - val mainClass = Option(request.mainClass).getOrElse { - throw new SubmitRestMissingFieldException("Main class is missing.") - } - - // Optional fields - val sparkProperties = request.sparkProperties - val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") - val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") - val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") - val superviseDriver = sparkProperties.get("spark.driver.supervise") - val driverMemory = sparkProperties.get("spark.driver.memory") - val driverCores = sparkProperties.get("spark.driver.cores") - val appArgs = request.appArgs - val environmentVariables = request.environmentVariables - val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) - - // Construct driver description - val conf = new SparkConf(false).setAll(sparkProperties) - val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) - val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) - val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf) - val javaOpts = sparkJavaOpts ++ extraJavaOpts - val command = new Command( - mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) - val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) - val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) - val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) - val submitDate = new Date() - val submissionId = newDriverId(submitDate) - - new MesosDriverDescription( - name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, - command, request.sparkProperties, submissionId, submitDate) - } - - protected override def handleSubmit( - requestMessageJson: String, - requestMessage: SubmitRestProtocolMessage, - responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { - requestMessage match { - case submitRequest: CreateSubmissionRequest => - val driverDescription = buildDriverDescription(submitRequest) - val s = scheduler.submitDriver(driverDescription) - s.serverSparkVersion = sparkVersion - val unknownFields = findUnknownFields(requestMessageJson, requestMessage) - if (unknownFields.nonEmpty) { - // If there are fields that the server does not know about, warn the client - s.unknownFields = unknownFields - } - s - case unexpected => - responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST) - handleError(s"Received message of unexpected type ${unexpected.messageType}.") - } - } -} - -private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) - extends KillRequestServlet { - protected override def handleKill(submissionId: String): KillSubmissionResponse = { - val k = scheduler.killDriver(submissionId) - k.serverSparkVersion = sparkVersion - k - } -} - -private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) - extends StatusRequestServlet { - protected override def handleStatus(submissionId: String): SubmissionStatusResponse = { - val d = scheduler.getDriverStatus(submissionId) - d.serverSparkVersion = sparkVersion - d - } -} diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala deleted file mode 100644 index 680cfb733e..0000000000 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.executor - -import java.nio.ByteBuffer - -import scala.collection.JavaConverters._ - -import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver} -import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -import org.apache.mesos.protobuf.ByteString - -import org.apache.spark.{SparkConf, SparkEnv, TaskState} -import org.apache.spark.TaskState.TaskState -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData -import org.apache.spark.util.Utils - -private[spark] class MesosExecutorBackend - extends MesosExecutor - with ExecutorBackend - with Logging { - - var executor: Executor = null - var driver: ExecutorDriver = null - - override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { - val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() - driver.sendStatusUpdate(MesosTaskStatus.newBuilder() - .setTaskId(mesosTaskId) - .setState(TaskState.toMesos(state)) - .setData(ByteString.copyFrom(data)) - .build()) - } - - override def registered( - driver: ExecutorDriver, - executorInfo: ExecutorInfo, - frameworkInfo: FrameworkInfo, - slaveInfo: SlaveInfo) { - - // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend. - val cpusPerTask = executorInfo.getResourcesList.asScala - .find(_.getName == "cpus") - .map(_.getScalar.getValue.toInt) - .getOrElse(0) - val executorId = executorInfo.getExecutorId.getValue - - logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus") - this.driver = driver - // Set a context class loader to be picked up by the serializer. Without this call - // the serializer would default to the null class loader, and fail to find Spark classes - // See SPARK-10986. - Thread.currentThread().setContextClassLoader(this.getClass.getClassLoader) - - val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ - Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) - val conf = new SparkConf(loadDefaults = true).setAll(properties) - val port = conf.getInt("spark.executor.port", 0) - val env = SparkEnv.createExecutorEnv( - conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false) - - executor = new Executor( - executorId, - slaveInfo.getHostname, - env) - } - - override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { - val taskId = taskInfo.getTaskId.getValue.toLong - val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData) - if (executor == null) { - logError("Received launchTask but executor was null") - } else { - SparkHadoopUtil.get.runAsSparkUser { () => - executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber, - taskInfo.getName, taskData.serializedTask) - } - } - } - - override def error(d: ExecutorDriver, message: String) { - logError("Error from Mesos: " + message) - } - - override def killTask(d: ExecutorDriver, t: TaskID) { - if (executor == null) { - logError("Received KillTask but executor was null") - } else { - // TODO: Determine the 'interruptOnCancel' property set for the given job. - executor.killTask(t.getValue.toLong, interruptThread = false) - } - } - - override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} - - override def disconnected(d: ExecutorDriver) {} - - override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} - - override def shutdown(d: ExecutorDriver) {} -} - -/** - * Entry point for Mesos executor. - */ -private[spark] object MesosExecutorBackend extends Logging { - def main(args: Array[String]) { - Utils.initDaemon(log) - // Create a new Executor and start it running - val runner = new MesosExecutorBackend() - new MesosExecutorDriver(runner).run() - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala deleted file mode 100644 index 61ab3e87c5..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import scala.collection.JavaConverters._ - -import org.apache.curator.framework.CuratorFramework -import org.apache.zookeeper.CreateMode -import org.apache.zookeeper.KeeperException.NoNodeException - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkCuratorUtil -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - -/** - * Persistence engine factory that is responsible for creating new persistence engines - * to store Mesos cluster mode state. - */ -private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) { - def createEngine(path: String): MesosClusterPersistenceEngine -} - -/** - * Mesos cluster persistence engine is responsible for persisting Mesos cluster mode - * specific state, so that on failover all the state can be recovered and the scheduler - * can resume managing the drivers. - */ -private[spark] trait MesosClusterPersistenceEngine { - def persist(name: String, obj: Object): Unit - def expunge(name: String): Unit - def fetch[T](name: String): Option[T] - def fetchAll[T](): Iterable[T] -} - -/** - * Zookeeper backed persistence engine factory. - * All Zk engines created from this factory shares the same Zookeeper client, so - * all of them reuses the same connection pool. - */ -private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf) - extends MesosClusterPersistenceEngineFactory(conf) with Logging { - - lazy val zk = SparkCuratorUtil.newClient(conf) - - def createEngine(path: String): MesosClusterPersistenceEngine = { - new ZookeeperMesosClusterPersistenceEngine(path, zk, conf) - } -} - -/** - * Black hole persistence engine factory that creates black hole - * persistence engines, which stores nothing. - */ -private[spark] class BlackHoleMesosClusterPersistenceEngineFactory - extends MesosClusterPersistenceEngineFactory(null) { - def createEngine(path: String): MesosClusterPersistenceEngine = { - new BlackHoleMesosClusterPersistenceEngine - } -} - -/** - * Black hole persistence engine that stores nothing. - */ -private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine { - override def persist(name: String, obj: Object): Unit = {} - override def fetch[T](name: String): Option[T] = None - override def expunge(name: String): Unit = {} - override def fetchAll[T](): Iterable[T] = Iterable.empty[T] -} - -/** - * Zookeeper based Mesos cluster persistence engine, that stores cluster mode state - * into Zookeeper. Each engine object is operating under one folder in Zookeeper, but - * reuses a shared Zookeeper client. - */ -private[spark] class ZookeeperMesosClusterPersistenceEngine( - baseDir: String, - zk: CuratorFramework, - conf: SparkConf) - extends MesosClusterPersistenceEngine with Logging { - private val WORKING_DIR = - conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir - - SparkCuratorUtil.mkdir(zk, WORKING_DIR) - - def path(name: String): String = { - WORKING_DIR + "/" + name - } - - override def expunge(name: String): Unit = { - zk.delete().forPath(path(name)) - } - - override def persist(name: String, obj: Object): Unit = { - val serialized = Utils.serialize(obj) - val zkPath = path(name) - zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized) - } - - override def fetch[T](name: String): Option[T] = { - val zkPath = path(name) - - try { - val fileData = zk.getData().forPath(zkPath) - Some(Utils.deserialize[T](fileData)) - } catch { - case e: NoNodeException => None - case e: Exception => - logWarning("Exception while reading persisted file, deleting", e) - zk.delete().forPath(zkPath) - None - } - } - - override def fetchAll[T](): Iterable[T] = { - zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T]) - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala deleted file mode 100644 index bb6f6b3e3f..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ /dev/null @@ -1,745 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.io.File -import java.util.{Collections, Date, List => JList} - -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.mesos.{Scheduler, SchedulerDriver} -import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} -import org.apache.mesos.Protos.Environment.Variable -import org.apache.mesos.Protos.TaskStatus.Reason - -import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState} -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse} -import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.Utils - -/** - * Tracks the current state of a Mesos Task that runs a Spark driver. - * @param driverDescription Submitted driver description from - * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]] - * @param taskId Mesos TaskID generated for the task - * @param slaveId Slave ID that the task is assigned to - * @param mesosTaskStatus The last known task status update. - * @param startDate The date the task was launched - * @param finishDate The date the task finished - * @param frameworkId Mesos framework ID the task registers with - */ -private[spark] class MesosClusterSubmissionState( - val driverDescription: MesosDriverDescription, - val taskId: TaskID, - val slaveId: SlaveID, - var mesosTaskStatus: Option[TaskStatus], - var startDate: Date, - var finishDate: Option[Date], - val frameworkId: String) - extends Serializable { - - def copy(): MesosClusterSubmissionState = { - new MesosClusterSubmissionState( - driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId) - } -} - -/** - * Tracks the retry state of a driver, which includes the next time it should be scheduled - * and necessary information to do exponential backoff. - * This class is not thread-safe, and we expect the caller to handle synchronizing state. - * - * @param lastFailureStatus Last Task status when it failed. - * @param retries Number of times it has been retried. - * @param nextRetry Time at which it should be retried next - * @param waitTime The amount of time driver is scheduled to wait until next retry. - */ -private[spark] class MesosClusterRetryState( - val lastFailureStatus: TaskStatus, - val retries: Int, - val nextRetry: Date, - val waitTime: Int) extends Serializable { - def copy(): MesosClusterRetryState = - new MesosClusterRetryState(lastFailureStatus, retries, nextRetry, waitTime) -} - -/** - * The full state of the cluster scheduler, currently being used for displaying - * information on the UI. - * - * @param frameworkId Mesos Framework id for the cluster scheduler. - * @param masterUrl The Mesos master url - * @param queuedDrivers All drivers queued to be launched - * @param launchedDrivers All launched or running drivers - * @param finishedDrivers All terminated drivers - * @param pendingRetryDrivers All drivers pending to be retried - */ -private[spark] class MesosClusterSchedulerState( - val frameworkId: String, - val masterUrl: Option[String], - val queuedDrivers: Iterable[MesosDriverDescription], - val launchedDrivers: Iterable[MesosClusterSubmissionState], - val finishedDrivers: Iterable[MesosClusterSubmissionState], - val pendingRetryDrivers: Iterable[MesosDriverDescription]) - -/** - * The full state of a Mesos driver, that is being used to display driver information on the UI. - */ -private[spark] class MesosDriverState( - val state: String, - val description: MesosDriverDescription, - val submissionState: Option[MesosClusterSubmissionState] = None) - -/** - * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode - * as Mesos tasks in a Mesos cluster. - * All drivers are launched asynchronously by the framework, which will eventually be launched - * by one of the slaves in the cluster. The results of the driver will be stored in slave's task - * sandbox which is accessible by visiting the Mesos UI. - * This scheduler supports recovery by persisting all its state and performs task reconciliation - * on recover, which gets all the latest state for all the drivers from Mesos master. - */ -private[spark] class MesosClusterScheduler( - engineFactory: MesosClusterPersistenceEngineFactory, - conf: SparkConf) - extends Scheduler with MesosSchedulerUtils { - var frameworkUrl: String = _ - private val metricsSystem = - MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf)) - private val master = conf.get("spark.master") - private val appName = conf.get("spark.app.name") - private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200) - private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200) - private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute - private val schedulerState = engineFactory.createEngine("scheduler") - private val stateLock = new Object() - private val finishedDrivers = - new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers) - private var frameworkId: String = null - // Holds all the launched drivers and current launch state, keyed by driver id. - private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]() - // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation. - // All drivers that are loaded after failover are added here, as we need get the latest - // state of the tasks from Mesos. - private val pendingRecover = new mutable.HashMap[String, SlaveID]() - // Stores all the submitted drivers that hasn't been launched. - private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]() - // All supervised drivers that are waiting to retry after termination. - private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]() - private val queuedDriversState = engineFactory.createEngine("driverQueue") - private val launchedDriversState = engineFactory.createEngine("launchedDrivers") - private val pendingRetryDriversState = engineFactory.createEngine("retryList") - // Flag to mark if the scheduler is ready to be called, which is until the scheduler - // is registered with Mesos master. - @volatile protected var ready = false - private var masterInfo: Option[MasterInfo] = None - - def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = { - val c = new CreateSubmissionResponse - if (!ready) { - c.success = false - c.message = "Scheduler is not ready to take requests" - return c - } - - stateLock.synchronized { - if (isQueueFull()) { - c.success = false - c.message = "Already reached maximum submission size" - return c - } - c.submissionId = desc.submissionId - queuedDriversState.persist(desc.submissionId, desc) - queuedDrivers += desc - c.success = true - } - c - } - - def killDriver(submissionId: String): KillSubmissionResponse = { - val k = new KillSubmissionResponse - if (!ready) { - k.success = false - k.message = "Scheduler is not ready to take requests" - return k - } - k.submissionId = submissionId - stateLock.synchronized { - // We look for the requested driver in the following places: - // 1. Check if submission is running or launched. - // 2. Check if it's still queued. - // 3. Check if it's in the retry list. - // 4. Check if it has already completed. - if (launchedDrivers.contains(submissionId)) { - val task = launchedDrivers(submissionId) - mesosDriver.killTask(task.taskId) - k.success = true - k.message = "Killing running driver" - } else if (removeFromQueuedDrivers(submissionId)) { - k.success = true - k.message = "Removed driver while it's still pending" - } else if (removeFromPendingRetryDrivers(submissionId)) { - k.success = true - k.message = "Removed driver while it's being retried" - } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) { - k.success = false - k.message = "Driver already terminated" - } else { - k.success = false - k.message = "Cannot find driver" - } - } - k - } - - def getDriverStatus(submissionId: String): SubmissionStatusResponse = { - val s = new SubmissionStatusResponse - if (!ready) { - s.success = false - s.message = "Scheduler is not ready to take requests" - return s - } - s.submissionId = submissionId - stateLock.synchronized { - if (queuedDrivers.exists(_.submissionId.equals(submissionId))) { - s.success = true - s.driverState = "QUEUED" - } else if (launchedDrivers.contains(submissionId)) { - s.success = true - s.driverState = "RUNNING" - launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString) - } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) { - s.success = true - s.driverState = "FINISHED" - finishedDrivers - .find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus - .foreach(state => s.message = state.toString) - } else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) { - val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId)) - .get.retryState.get.lastFailureStatus - s.success = true - s.driverState = "RETRYING" - s.message = status.toString - } else { - s.success = false - s.driverState = "NOT_FOUND" - } - } - s - } - - /** - * Gets the driver state to be displayed on the Web UI. - */ - def getDriverState(submissionId: String): Option[MesosDriverState] = { - stateLock.synchronized { - queuedDrivers.find(_.submissionId.equals(submissionId)) - .map(d => new MesosDriverState("QUEUED", d)) - .orElse(launchedDrivers.get(submissionId) - .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d)))) - .orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId)) - .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d)))) - .orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId)) - .map(d => new MesosDriverState("RETRYING", d))) - } - } - - private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity - - /** - * Recover scheduler state that is persisted. - * We still need to do task reconciliation to be up to date of the latest task states - * as it might have changed while the scheduler is failing over. - */ - private def recoverState(): Unit = { - stateLock.synchronized { - launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state => - launchedDrivers(state.taskId.getValue) = state - pendingRecover(state.taskId.getValue) = state.slaveId - } - queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d) - // There is potential timing issue where a queued driver might have been launched - // but the scheduler shuts down before the queued driver was able to be removed - // from the queue. We try to mitigate this issue by walking through all queued drivers - // and remove if they're already launched. - queuedDrivers - .filter(d => launchedDrivers.contains(d.submissionId)) - .foreach(d => removeFromQueuedDrivers(d.submissionId)) - pendingRetryDriversState.fetchAll[MesosDriverDescription]() - .foreach(s => pendingRetryDrivers += s) - // TODO: Consider storing finished drivers so we can show them on the UI after - // failover. For now we clear the history on each recovery. - finishedDrivers.clear() - } - } - - /** - * Starts the cluster scheduler and wait until the scheduler is registered. - * This also marks the scheduler to be ready for requests. - */ - def start(): Unit = { - // TODO: Implement leader election to make sure only one framework running in the cluster. - val fwId = schedulerState.fetch[String]("frameworkId") - fwId.foreach { id => - frameworkId = id - } - recoverState() - metricsSystem.registerSource(new MesosClusterSchedulerSource(this)) - metricsSystem.start() - val driver = createSchedulerDriver( - master, - MesosClusterScheduler.this, - Utils.getCurrentUserName(), - appName, - conf, - Some(frameworkUrl), - Some(true), - Some(Integer.MAX_VALUE), - fwId) - - startScheduler(driver) - ready = true - } - - def stop(): Unit = { - ready = false - metricsSystem.report() - metricsSystem.stop() - mesosDriver.stop(true) - } - - override def registered( - driver: SchedulerDriver, - newFrameworkId: FrameworkID, - masterInfo: MasterInfo): Unit = { - logInfo("Registered as framework ID " + newFrameworkId.getValue) - if (newFrameworkId.getValue != frameworkId) { - frameworkId = newFrameworkId.getValue - schedulerState.persist("frameworkId", frameworkId) - } - markRegistered() - - stateLock.synchronized { - this.masterInfo = Some(masterInfo) - if (!pendingRecover.isEmpty) { - // Start task reconciliation if we need to recover. - val statuses = pendingRecover.collect { - case (taskId, slaveId) => - val newStatus = TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId).build()) - .setSlaveId(slaveId) - .setState(MesosTaskState.TASK_STAGING) - .build() - launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus)) - .getOrElse(newStatus) - } - // TODO: Page the status updates to avoid trying to reconcile - // a large amount of tasks at once. - driver.reconcileTasks(statuses.toSeq.asJava) - } - } - } - - private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = { - desc.conf.getOption("spark.executor.uri") - .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) - } - - private def getDriverFrameworkID(desc: MesosDriverDescription): String = { - s"${frameworkId}-${desc.submissionId}" - } - - private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { - m.updated(k, f(m.getOrElse(k, default))) - } - - private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { - // TODO(mgummelt): Don't do this here. This should be passed as a --conf - val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( - v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" - ) - - val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv - - val envBuilder = Environment.newBuilder() - env.foreach { case (k, v) => - envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v)) - } - envBuilder.build() - } - - private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = { - val confUris = List(conf.getOption("spark.mesos.uris"), - desc.conf.getOption("spark.mesos.uris"), - desc.conf.getOption("spark.submit.pyFiles")).flatMap( - _.map(_.split(",").map(_.trim)) - ).flatten - - val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:") - - ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri => - CommandInfo.URI.newBuilder().setValue(uri.trim()).build()) - } - - private def getDriverCommandValue(desc: MesosDriverDescription): String = { - val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image") - val executorUri = getDriverExecutorURI(desc) - // Gets the path to run spark-submit, and the path to the Mesos sandbox. - val (executable, sandboxPath) = if (dockerDefined) { - // Application jar is automatically downloaded in the mounted sandbox by Mesos, - // and the path to the mounted volume is stored in $MESOS_SANDBOX env variable. - ("./bin/spark-submit", "$MESOS_SANDBOX") - } else if (executorUri.isDefined) { - val folderBasename = executorUri.get.split('/').last.split('.').head - - val entries = conf.getOption("spark.executor.extraLibraryPath") - .map(path => Seq(path) ++ desc.command.libraryPathEntries) - .getOrElse(desc.command.libraryPathEntries) - - val prefixEnv = if (!entries.isEmpty) Utils.libraryPathEnvPrefix(entries) else "" - - val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit" - // Sandbox path points to the parent folder as we chdir into the folderBasename. - (cmdExecutable, "..") - } else { - val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home") - .orElse(conf.getOption("spark.home")) - .orElse(Option(System.getenv("SPARK_HOME"))) - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } - val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath - // Sandbox points to the current directory by default with Mesos. - (cmdExecutable, ".") - } - val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ") - val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString() - val appArguments = desc.command.arguments.mkString(" ") - - s"$executable $cmdOptions $primaryResource $appArguments" - } - - private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = { - val builder = CommandInfo.newBuilder() - builder.setValue(getDriverCommandValue(desc)) - builder.setEnvironment(getDriverEnvironment(desc)) - builder.addAllUris(getDriverUris(desc).asJava) - builder.build() - } - - private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = { - var options = Seq( - "--name", desc.conf.get("spark.app.name"), - "--master", s"mesos://${conf.get("spark.master")}", - "--driver-cores", desc.cores.toString, - "--driver-memory", s"${desc.mem}M") - - // Assume empty main class means we're running python - if (!desc.command.mainClass.equals("")) { - options ++= Seq("--class", desc.command.mainClass) - } - - desc.conf.getOption("spark.executor.memory").foreach { v => - options ++= Seq("--executor-memory", v) - } - desc.conf.getOption("spark.cores.max").foreach { v => - options ++= Seq("--total-executor-cores", v) - } - desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles => - val formattedFiles = pyFiles.split(",") - .map { path => new File(sandboxPath, path.split("/").last).toString() } - .mkString(",") - options ++= Seq("--py-files", formattedFiles) - } - - // --conf - val replicatedOptionsBlacklist = Set( - "spark.jars", // Avoids duplicate classes in classpath - "spark.submit.deployMode", // this would be set to `cluster`, but we need client - "spark.master" // this contains the address of the dispatcher, not master - ) - val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap - val driverConf = desc.conf.getAll - .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) } - .toMap - (defaultConf ++ driverConf).foreach { case (key, value) => - options ++= Seq("--conf", s"$key=${shellEscape(value)}") } - - options - } - - /** - * Escape args for Unix-like shells, unless already quoted by the user. - * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html - * and http://www.grymoire.com/Unix/Quote.html - * - * @param value argument - * @return escaped argument - */ - private[scheduler] def shellEscape(value: String): String = { - val WrappedInQuotes = """^(".+"|'.+')$""".r - val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r - value match { - case WrappedInQuotes(c) => value // The user quoted his args, don't touch it! - case ShellSpecialChars(c) => "\"" + value.replaceAll("""(["`\$\\])""", """\\$1""") + "\"" - case _: String => value // Don't touch harmless strings - } - } - - private class ResourceOffer( - val offerId: OfferID, - val slaveId: SlaveID, - var resources: JList[Resource]) { - override def toString(): String = { - s"Offer id: ${offerId}, resources: ${resources}" - } - } - - private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = { - val taskId = TaskID.newBuilder().setValue(desc.submissionId).build() - - val (remainingResources, cpuResourcesToUse) = - partitionResources(offer.resources, "cpus", desc.cores) - val (finalResources, memResourcesToUse) = - partitionResources(remainingResources.asJava, "mem", desc.mem) - offer.resources = finalResources.asJava - - val appName = desc.conf.get("spark.app.name") - val taskInfo = TaskInfo.newBuilder() - .setTaskId(taskId) - .setName(s"Driver for ${appName}") - .setSlaveId(offer.slaveId) - .setCommand(buildDriverCommand(desc)) - .addAllResources(cpuResourcesToUse.asJava) - .addAllResources(memResourcesToUse.asJava) - - desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image, - desc.conf, - taskInfo.getContainerBuilder) - } - - taskInfo.build - } - - /** - * This method takes all the possible candidates and attempt to schedule them with Mesos offers. - * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled - * logic on each task. - */ - private def scheduleTasks( - candidates: Seq[MesosDriverDescription], - afterLaunchCallback: (String) => Boolean, - currentOffers: List[ResourceOffer], - tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = { - for (submission <- candidates) { - val driverCpu = submission.cores - val driverMem = submission.mem - logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem") - val offerOption = currentOffers.find { o => - getResource(o.resources, "cpus") >= driverCpu && - getResource(o.resources, "mem") >= driverMem - } - if (offerOption.isEmpty) { - logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " + - s"cpu: $driverCpu, mem: $driverMem") - } else { - val offer = offerOption.get - val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo]) - val task = createTaskInfo(submission, offer) - queuedTasks += task - logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + - submission.submissionId) - val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId, - None, new Date(), None, getDriverFrameworkID(submission)) - launchedDrivers(submission.submissionId) = newState - launchedDriversState.persist(submission.submissionId, newState) - afterLaunchCallback(submission.submissionId) - } - } - } - - override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { - logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}") - val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() - val currentTime = new Date() - - val currentOffers = offers.asScala.map { - o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList) - }.toList - - stateLock.synchronized { - // We first schedule all the supervised drivers that are ready to retry. - // This list will be empty if none of the drivers are marked as supervise. - val driversToRetry = pendingRetryDrivers.filter { d => - d.retryState.get.nextRetry.before(currentTime) - } - - scheduleTasks( - copyBuffer(driversToRetry), - removeFromPendingRetryDrivers, - currentOffers, - tasks) - - // Then we walk through the queued drivers and try to schedule them. - scheduleTasks( - copyBuffer(queuedDrivers), - removeFromQueuedDrivers, - currentOffers, - tasks) - } - tasks.foreach { case (offerId, taskInfos) => - driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava) - } - - for (o <- currentOffers if !tasks.contains(o.offerId)) { - driver.declineOffer(o.offerId) - } - } - - private def copyBuffer( - buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = { - val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size) - buffer.copyToBuffer(newBuffer) - newBuffer - } - - def getSchedulerState(): MesosClusterSchedulerState = { - stateLock.synchronized { - new MesosClusterSchedulerState( - frameworkId, - masterInfo.map(m => s"http://${m.getIp}:${m.getPort}"), - copyBuffer(queuedDrivers), - launchedDrivers.values.map(_.copy()).toList, - finishedDrivers.map(_.copy()).toList, - copyBuffer(pendingRetryDrivers)) - } - } - - override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {} - override def disconnected(driver: SchedulerDriver): Unit = {} - override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = { - logInfo(s"Framework re-registered with master ${masterInfo.getId}") - } - override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {} - override def error(driver: SchedulerDriver, error: String): Unit = { - logError("Error received: " + error) - markErr() - } - - /** - * Check if the task state is a recoverable state that we can relaunch the task. - * Task state like TASK_ERROR are not relaunchable state since it wasn't able - * to be validated by Mesos. - */ - private def shouldRelaunch(state: MesosTaskState): Boolean = { - state == MesosTaskState.TASK_FAILED || - state == MesosTaskState.TASK_KILLED || - state == MesosTaskState.TASK_LOST - } - - override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = { - val taskId = status.getTaskId.getValue - stateLock.synchronized { - if (launchedDrivers.contains(taskId)) { - if (status.getReason == Reason.REASON_RECONCILIATION && - !pendingRecover.contains(taskId)) { - // Task has already received update and no longer requires reconciliation. - return - } - val state = launchedDrivers(taskId) - // Check if the driver is supervise enabled and can be relaunched. - if (state.driverDescription.supervise && shouldRelaunch(status.getState)) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState - val (retries, waitTimeSec) = retryState - .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) } - .getOrElse{ (1, 1) } - val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L) - - val newDriverDescription = state.driverDescription.copy( - retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec))) - pendingRetryDrivers += newDriverDescription - pendingRetryDriversState.persist(taskId, newDriverDescription) - } else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - if (finishedDrivers.size >= retainedDrivers) { - val toRemove = math.max(retainedDrivers / 10, 1) - finishedDrivers.trimStart(toRemove) - } - finishedDrivers += state - } - state.mesosTaskStatus = Option(status) - } else { - logError(s"Unable to find driver $taskId in status update") - } - } - } - - override def frameworkMessage( - driver: SchedulerDriver, - executorId: ExecutorID, - slaveId: SlaveID, - message: Array[Byte]): Unit = {} - - override def executorLost( - driver: SchedulerDriver, - executorId: ExecutorID, - slaveId: SlaveID, - status: Int): Unit = {} - - private def removeFromQueuedDrivers(id: String): Boolean = { - val index = queuedDrivers.indexWhere(_.submissionId.equals(id)) - if (index != -1) { - queuedDrivers.remove(index) - queuedDriversState.expunge(id) - true - } else { - false - } - } - - private def removeFromLaunchedDrivers(id: String): Boolean = { - if (launchedDrivers.remove(id).isDefined) { - launchedDriversState.expunge(id) - true - } else { - false - } - } - - private def removeFromPendingRetryDrivers(id: String): Boolean = { - val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id)) - if (index != -1) { - pendingRetryDrivers.remove(index) - pendingRetryDriversState.expunge(id) - true - } else { - false - } - } - - def getQueuedDriversSize: Int = queuedDrivers.size - def getLaunchedDriversSize: Int = launchedDrivers.size - def getPendingRetryDriversSize: Int = pendingRetryDrivers.size -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala deleted file mode 100644 index 1fe94974c8..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import com.codahale.metrics.{Gauge, MetricRegistry} - -import org.apache.spark.metrics.source.Source - -private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterScheduler) - extends Source { - override def sourceName: String = "mesos_cluster" - override def metricRegistry: MetricRegistry = new MetricRegistry() - - metricRegistry.register(MetricRegistry.name("waitingDrivers"), new Gauge[Int] { - override def getValue: Int = scheduler.getQueuedDriversSize - }) - - metricRegistry.register(MetricRegistry.name("launchedDrivers"), new Gauge[Int] { - override def getValue: Int = scheduler.getLaunchedDriversSize - }) - - metricRegistry.register(MetricRegistry.name("retryDrivers"), new Gauge[Int] { - override def getValue: Int = scheduler.getPendingRetryDriversSize - }) -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala deleted file mode 100644 index 6b9313e5ed..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ /dev/null @@ -1,642 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.io.File -import java.util.{Collections, List => JList} -import java.util.concurrent.locks.ReentrantLock - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} - -import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} -import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient -import org.apache.spark.rpc.RpcEndpointAddress -import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.Utils - -/** - * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds - * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever - * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the - * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable - * latency. - * - * Unfortunately this has a bit of duplication from [[MesosFineGrainedSchedulerBackend]], - * but it seems hard to remove this. - */ -private[spark] class MesosCoarseGrainedSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext, - master: String, - securityManager: SecurityManager) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with org.apache.mesos.Scheduler - with MesosSchedulerUtils { - - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures - - // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt - - private[this] val shutdownTimeoutMS = - conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") - .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") - - // Synchronization protected by stateLock - private[this] var stopCalled: Boolean = false - - // If shuffle service is enabled, the Spark driver will register with the shuffle service. - // This is for cleaning up shuffle files reliably. - private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) - - // Cores we have acquired with each Mesos task ID - val coresByTaskId = new mutable.HashMap[String, Int] - var totalCoresAcquired = 0 - - // SlaveID -> Slave - // This map accumulates entries for the duration of the job. Slaves are never deleted, because - // we need to maintain e.g. failure state and connection state. - private val slaves = new mutable.HashMap[String, Slave] - - /** - * The total number of executors we aim to have. Undefined when not using dynamic allocation. - * Initially set to 0 when using dynamic allocation, the executor allocation manager will send - * the real initial limit later. - */ - private var executorLimitOption: Option[Int] = { - if (Utils.isDynamicAllocationEnabled(conf)) { - Some(0) - } else { - None - } - } - - /** - * Return the current executor limit, which may be [[Int.MaxValue]] - * before properly initialized. - */ - private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue) - - // private lock object protecting mutable state above. Using the intrinsic lock - // may lead to deadlocks since the superclass might also try to lock - private val stateLock = new ReentrantLock - - val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) - - // Offer constraints - private val slaveOfferConstraints = - parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) - - // Reject offers with mismatched constraints in seconds - private val rejectOfferDurationForUnmetConstraints = - getRejectOfferDurationForUnmetConstraints(sc) - - // Reject offers when we reached the maximum number of cores for this framework - private val rejectOfferDurationForReachedMaxCores = - getRejectOfferDurationForReachedMaxCores(sc) - - // A client for talking to the external shuffle service - private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { - if (shuffleServiceEnabled) { - Some(getShuffleClient()) - } else { - None - } - } - - // This method is factored out for testability - protected def getShuffleClient(): MesosExternalShuffleClient = { - new MesosExternalShuffleClient( - SparkTransportConf.fromSparkConf(conf, "shuffle"), - securityManager, - securityManager.isAuthenticationEnabled(), - securityManager.isSaslEncryptionEnabled()) - } - - var nextMesosTaskId = 0 - - @volatile var appId: String = _ - - def newMesosTaskId(): String = { - val id = nextMesosTaskId - nextMesosTaskId += 1 - id.toString - } - - override def start() { - super.start() - val driver = createSchedulerDriver( - master, - MesosCoarseGrainedSchedulerBackend.this, - sc.sparkUser, - sc.appName, - sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), - None, - None, - sc.conf.getOption("spark.mesos.driver.frameworkId") - ) - - unsetFrameworkID(sc) - startScheduler(driver) - } - - def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { - val environment = Environment.newBuilder() - val extraClassPath = conf.getOption("spark.executor.extraClassPath") - extraClassPath.foreach { cp => - environment.addVariables( - Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) - } - val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "") - - // Set the environment variable through a command prefix - // to append to the existing value of the variable - val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p => - Utils.libraryPathEnvPrefix(Seq(p)) - }.getOrElse("") - - environment.addVariables( - Environment.Variable.newBuilder() - .setName("SPARK_EXECUTOR_OPTS") - .setValue(extraJavaOpts) - .build()) - - sc.executorEnvs.foreach { case (key, value) => - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } - val command = CommandInfo.newBuilder() - .setEnvironment(environment) - - val uri = conf.getOption("spark.executor.uri") - .orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) - - if (uri.isEmpty) { - val executorSparkHome = conf.getOption("spark.mesos.executor.home") - .orElse(sc.getSparkHome()) - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } - val runScript = new File(executorSparkHome, "./bin/spark-class").getPath - command.setValue( - "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" - .format(prefixEnv, runScript) + - s" --driver-url $driverURL" + - s" --executor-id $taskId" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") - } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". - val basename = uri.get.split('/').last.split('.').head - command.setValue( - s"cd $basename*; $prefixEnv " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + - s" --driver-url $driverURL" + - s" --executor-id $taskId" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") - command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) - } - - conf.getOption("spark.mesos.uris").foreach(setupUris(_, command)) - - command.build() - } - - protected def driverURL: String = { - if (conf.contains("spark.testing")) { - "driverURL" - } else { - RpcEndpointAddress( - conf.get("spark.driver.host"), - conf.get("spark.driver.port").toInt, - CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - } - } - - override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {} - - override def registered( - d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - appId = frameworkId.getValue - mesosExternalShuffleClient.foreach(_.init(appId)) - markRegistered() - } - - override def sufficientResourcesRegistered(): Boolean = { - totalCoresAcquired >= maxCores * minRegisteredRatio - } - - override def disconnected(d: org.apache.mesos.SchedulerDriver) {} - - override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {} - - /** - * Method called by Mesos to offer resources on slaves. We respond by launching an executor, - * unless we've already launched more than we wanted to. - */ - override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) { - stateLock.synchronized { - if (stopCalled) { - logDebug("Ignoring offers during shutdown") - // Driver should simply return a stopped status on race - // condition between this.stop() and completing here - offers.asScala.map(_.getId).foreach(d.declineOffer) - return - } - - logDebug(s"Received ${offers.size} resource offers.") - - val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => - val offerAttributes = toAttributeMap(offer.getAttributesList) - matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) - } - - declineUnmatchedOffers(d, unmatchedOffers) - handleMatchedOffers(d, matchedOffers) - } - } - - private def declineUnmatchedOffers( - d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { - offers.foreach { offer => - declineOffer(d, offer, Some("unmet constraints"), - Some(rejectOfferDurationForUnmetConstraints)) - } - } - - private def declineOffer( - d: org.apache.mesos.SchedulerDriver, - offer: Offer, - reason: Option[String] = None, - refuseSeconds: Option[Long] = None): Unit = { - - val id = offer.getId.getValue - val offerAttributes = toAttributeMap(offer.getAttributesList) - val mem = getResource(offer.getResourcesList, "mem") - val cpus = getResource(offer.getResourcesList, "cpus") - val ports = getRangeResource(offer.getResourcesList, "ports") - - logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" + - s" cpu: $cpus port: $ports for $refuseSeconds seconds" + - reason.map(r => s" (reason: $r)").getOrElse("")) - - refuseSeconds match { - case Some(seconds) => - val filters = Filters.newBuilder().setRefuseSeconds(seconds).build() - d.declineOffer(offer.getId, filters) - case _ => d.declineOffer(offer.getId) - } - } - - /** - * Launches executors on accepted offers, and declines unused offers. Executors are launched - * round-robin on offers. - * - * @param d SchedulerDriver - * @param offers Mesos offers that match attribute constraints - */ - private def handleMatchedOffers( - d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { - val tasks = buildMesosTasks(offers) - for (offer <- offers) { - val offerAttributes = toAttributeMap(offer.getAttributesList) - val offerMem = getResource(offer.getResourcesList, "mem") - val offerCpus = getResource(offer.getResourcesList, "cpus") - val offerPorts = getRangeResource(offer.getResourcesList, "ports") - val id = offer.getId.getValue - - if (tasks.contains(offer.getId)) { // accept - val offerTasks = tasks(offer.getId) - - logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + - s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." + - s" Launching ${offerTasks.size} Mesos tasks.") - - for (task <- offerTasks) { - val taskId = task.getTaskId - val mem = getResource(task.getResourcesList, "mem") - val cpus = getResource(task.getResourcesList, "cpus") - val ports = getRangeResource(task.getResourcesList, "ports").mkString(",") - - logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" + - s" ports: $ports") - } - - d.launchTasks( - Collections.singleton(offer.getId), - offerTasks.asJava) - } else if (totalCoresAcquired >= maxCores) { - // Reject an offer for a configurable amount of time to avoid starving other frameworks - declineOffer(d, offer, Some("reached spark.cores.max"), - Some(rejectOfferDurationForReachedMaxCores)) - } else { - declineOffer(d, offer) - } - } - } - - /** - * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize - * per-task memory and IO, tasks are round-robin assigned to offers. - * - * @param offers Mesos offers that match attribute constraints - * @return A map from OfferID to a list of Mesos tasks to launch on that offer - */ - private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { - // offerID -> tasks - val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) - - // offerID -> resources - val remainingResources = mutable.Map(offers.map(offer => - (offer.getId.getValue, offer.getResourcesList)): _*) - - var launchTasks = true - - // TODO(mgummelt): combine offers for a single slave - // - // round-robin create executors on the available offers - while (launchTasks) { - launchTasks = false - - for (offer <- offers) { - val slaveId = offer.getSlaveId.getValue - val offerId = offer.getId.getValue - val resources = remainingResources(offerId) - - if (canLaunchTask(slaveId, resources)) { - // Create a task - launchTasks = true - val taskId = newMesosTaskId() - val offerCPUs = getResource(resources, "cpus").toInt - - val taskCPUs = executorCores(offerCPUs) - val taskMemory = executorMemory(sc) - - slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) - - val (resourcesLeft, resourcesToUse) = - partitionTaskResources(resources, taskCPUs, taskMemory) - - val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) - .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) - .setName("Task " + taskId) - - taskBuilder.addAllResources(resourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( - image, - sc.conf, - taskBuilder.getContainerBuilder - ) - } - - tasks(offer.getId) ::= taskBuilder.build() - remainingResources(offerId) = resourcesLeft.asJava - totalCoresAcquired += taskCPUs - coresByTaskId(taskId) = taskCPUs - } - } - } - tasks.toMap - } - - /** Extracts task needed resources from a list of available resources. */ - private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int) - : (List[Resource], List[Resource]) = { - - // partition cpus & mem - val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs) - val (afterMemResources, memResourcesToUse) = - partitionResources(afterCPUResources.asJava, "mem", taskMemory) - - // If user specifies port numbers in SparkConfig then consecutive tasks will not be launched - // on the same host. This essentially means one executor per host. - // TODO: handle network isolator case - val (nonPortResources, portResourcesToUse) = - partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources) - - (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse) - } - - private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = { - val offerMem = getResource(resources, "mem") - val offerCPUs = getResource(resources, "cpus").toInt - val cpus = executorCores(offerCPUs) - val mem = executorMemory(sc) - val ports = getRangeResource(resources, "ports") - val meetsPortRequirements = checkPorts(sc.conf, ports) - - cpus > 0 && - cpus <= offerCPUs && - cpus + totalCoresAcquired <= maxCores && - mem <= offerMem && - numExecutors() < executorLimit && - slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && - meetsPortRequirements - } - - private def executorCores(offerCPUs: Int): Int = { - sc.conf.getInt("spark.executor.cores", - math.min(offerCPUs, maxCores - totalCoresAcquired)) - } - - override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) { - val taskId = status.getTaskId.getValue - val slaveId = status.getSlaveId.getValue - val state = TaskState.fromMesos(status.getState) - - logInfo(s"Mesos task $taskId is now ${status.getState}") - - stateLock.synchronized { - val slave = slaves(slaveId) - - // If the shuffle service is enabled, have the driver register with each one of the - // shuffle services. This allows the shuffle services to clean up state associated with - // this application when the driver exits. There is currently not a great way to detect - // this through Mesos, since the shuffle services are set up independently. - if (state.equals(TaskState.RUNNING) && - shuffleServiceEnabled && - !slave.shuffleRegistered) { - assume(mesosExternalShuffleClient.isDefined, - "External shuffle client was not instantiated even though shuffle service is enabled.") - // TODO: Remove this and allow the MesosExternalShuffleService to detect - // framework termination when new Mesos Framework HTTP API is available. - val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) - - logDebug(s"Connecting to shuffle service on slave $slaveId, " + - s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}") - - mesosExternalShuffleClient.get - .registerDriverWithShuffleService( - slave.hostname, - externalShufflePort, - sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"), - sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) - slave.shuffleRegistered = true - } - - if (TaskState.isFinished(state)) { - // Remove the cores we have remembered for this task, if it's in the hashmap - for (cores <- coresByTaskId.get(taskId)) { - totalCoresAcquired -= cores - coresByTaskId -= taskId - } - // If it was a failure, mark the slave as failed for blacklisting purposes - if (TaskState.isFailed(state)) { - slave.taskFailures += 1 - - if (slave.taskFailures >= MAX_SLAVE_FAILURES) { - logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + - "is Spark installed on it?") - } - } - executorTerminated(d, slaveId, taskId, s"Executor finished with state $state") - // In case we'd rejected everything before but have now lost a node - d.reviveOffers() - } - } - } - - override def error(d: org.apache.mesos.SchedulerDriver, message: String) { - logError(s"Mesos error: $message") - scheduler.error(message) - } - - override def stop() { - // Make sure we're not launching tasks during shutdown - stateLock.synchronized { - if (stopCalled) { - logWarning("Stop called multiple times, ignoring") - return - } - stopCalled = true - super.stop() - } - - // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them. - // See SPARK-12330 - val startTime = System.nanoTime() - - // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent - while (numExecutors() > 0 && - System.nanoTime() - startTime < shutdownTimeoutMS * 1000L * 1000L) { - Thread.sleep(100) - } - - if (numExecutors() > 0) { - logWarning(s"Timed out waiting for ${numExecutors()} remaining executors " - + s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files " - + "on the mesos nodes.") - } - - // Close the mesos external shuffle client if used - mesosExternalShuffleClient.foreach(_.close()) - - if (mesosDriver != null) { - mesosDriver.stop() - } - } - - override def frameworkMessage( - d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - - /** - * Called when a slave is lost or a Mesos task finished. Updates local view on - * what tasks are running. It also notifies the driver that an executor was removed. - */ - private def executorTerminated( - d: org.apache.mesos.SchedulerDriver, - slaveId: String, - taskId: String, - reason: String): Unit = { - stateLock.synchronized { - // Do not call removeExecutor() after this scheduler backend was stopped because - // removeExecutor() internally will send a message to the driver endpoint but - // the driver endpoint is not available now, otherwise an exception will be thrown. - if (!stopCalled) { - removeExecutor(taskId, SlaveLost(reason)) - } - slaves(slaveId).taskIDs.remove(taskId) - } - } - - override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit = { - logInfo(s"Mesos slave lost: ${slaveId.getValue}") - } - - override def executorLost( - d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = { - logInfo("Mesos executor lost: %s".format(e.getValue)) - } - - override def applicationId(): String = - Option(appId).getOrElse { - logWarning("Application ID is not initialized yet.") - super.applicationId - } - - override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { - // We don't truly know if we can fulfill the full amount of executors - // since at coarse grain it depends on the amount of slaves available. - logInfo("Capping the total amount of executors to " + requestedTotal) - executorLimitOption = Some(requestedTotal) - true - } - - override def doKillExecutors(executorIds: Seq[String]): Boolean = { - if (mesosDriver == null) { - logWarning("Asked to kill executors before the Mesos driver was started.") - false - } else { - for (executorId <- executorIds) { - val taskId = TaskID.newBuilder().setValue(executorId).build() - mesosDriver.killTask(taskId) - } - // no need to adjust `executorLimitOption` since the AllocationManager already communicated - // the desired limit through a call to `doRequestTotalExecutors`. - // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]] - true - } - } - - private def numExecutors(): Int = { - slaves.values.map(_.taskIDs.size).sum - } -} - -private class Slave(val hostname: String) { - val taskIDs = new mutable.HashSet[String]() - var taskFailures = 0 - var shuffleRegistered = false -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala deleted file mode 100644 index f1e48fa7c5..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ /dev/null @@ -1,451 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.io.File -import java.util.{ArrayList => JArrayList, Collections, List => JList} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap, HashSet} - -import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} -import org.apache.mesos.protobuf.ByteString - -import org.apache.spark.{SparkContext, SparkException, TaskState} -import org.apache.spark.executor.MesosExecutorBackend -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.util.Utils - -/** - * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a - * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks - * from multiple apps can run on different cores) and in time (a core can switch ownership). - */ -private[spark] class MesosFineGrainedSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext, - master: String) - extends SchedulerBackend - with org.apache.mesos.Scheduler - with MesosSchedulerUtils { - - // Stores the slave ids that has launched a Mesos executor. - val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo] - val taskIdToSlaveId = new HashMap[Long, String] - - // An ExecutorInfo for our tasks - var execArgs: Array[Byte] = null - - var classLoader: ClassLoader = null - - // The listener bus to publish executor added/removed events. - val listenerBus = sc.listenerBus - - private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) - - // Offer constraints - private[this] val slaveOfferConstraints = - parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) - - // reject offers with mismatched constraints in seconds - private val rejectOfferDurationForUnmetConstraints = - getRejectOfferDurationForUnmetConstraints(sc) - - @volatile var appId: String = _ - - override def start() { - classLoader = Thread.currentThread.getContextClassLoader - val driver = createSchedulerDriver( - master, - MesosFineGrainedSchedulerBackend.this, - sc.sparkUser, - sc.appName, - sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), - Option.empty, - Option.empty, - sc.conf.getOption("spark.mesos.driver.frameworkId") - ) - - unsetFrameworkID(sc) - startScheduler(driver) - } - - /** - * Creates a MesosExecutorInfo that is used to launch a Mesos executor. - * @param availableResources Available resources that is offered by Mesos - * @param execId The executor id to assign to this new executor. - * @return A tuple of the new mesos executor info and the remaining available resources. - */ - def createExecutorInfo( - availableResources: JList[Resource], - execId: String): (MesosExecutorInfo, JList[Resource]) = { - val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") - .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } - val environment = Environment.newBuilder() - sc.conf.getOption("spark.executor.extraClassPath").foreach { cp => - environment.addVariables( - Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) - } - val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("") - - val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p => - Utils.libraryPathEnvPrefix(Seq(p)) - }.getOrElse("") - - environment.addVariables( - Environment.Variable.newBuilder() - .setName("SPARK_EXECUTOR_OPTS") - .setValue(extraJavaOpts) - .build()) - sc.executorEnvs.foreach { case (key, value) => - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } - val command = CommandInfo.newBuilder() - .setEnvironment(environment) - val uri = sc.conf.getOption("spark.executor.uri") - .orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) - - val executorBackendName = classOf[MesosExecutorBackend].getName - if (uri.isEmpty) { - val executorPath = new File(executorSparkHome, "/bin/spark-class").getPath - command.setValue(s"$prefixEnv $executorPath $executorBackendName") - } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". - val basename = uri.get.split('/').last.split('.').head - command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName") - command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) - } - val builder = MesosExecutorInfo.newBuilder() - val (resourcesAfterCpu, usedCpuResources) = - partitionResources(availableResources, "cpus", mesosExecutorCores) - val (resourcesAfterMem, usedMemResources) = - partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc)) - - builder.addAllResources(usedCpuResources.asJava) - builder.addAllResources(usedMemResources.asJava) - - sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command)) - - val executorInfo = builder - .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) - .setCommand(command) - .setData(ByteString.copyFrom(createExecArg())) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( - image, - sc.conf, - executorInfo.getContainerBuilder() - ) - } - - (executorInfo.build(), resourcesAfterMem.asJava) - } - - /** - * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array - * containing all the spark.* system properties in the form of (String, String) pairs. - */ - private def createExecArg(): Array[Byte] = { - if (execArgs == null) { - val props = new HashMap[String, String] - for ((key, value) <- sc.conf.getAll) { - props(key) = value - } - // Serialize the map as an array of (String, String) pairs - execArgs = Utils.serialize(props.toArray) - } - execArgs - } - - override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {} - - override def registered( - d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - inClassLoader() { - appId = frameworkId.getValue - logInfo("Registered as framework ID " + appId) - markRegistered() - } - } - - private def inClassLoader()(fun: => Unit) = { - val oldClassLoader = Thread.currentThread.getContextClassLoader - Thread.currentThread.setContextClassLoader(classLoader) - try { - fun - } finally { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } - } - - override def disconnected(d: org.apache.mesos.SchedulerDriver) {} - - override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {} - - private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = { - val builder = new StringBuilder - tasks.asScala.foreach { t => - builder.append("Task id: ").append(t.getTaskId.getValue).append("\n") - .append("Slave id: ").append(t.getSlaveId.getValue).append("\n") - .append("Task resources: ").append(t.getResourcesList).append("\n") - .append("Executor resources: ").append(t.getExecutor.getResourcesList) - .append("---------------------------------------------\n") - } - builder.toString() - } - - /** - * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets - * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that - * tasks are balanced across the cluster. - */ - override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) { - inClassLoader() { - // Fail first on offers with unmet constraints - val (offersMatchingConstraints, offersNotMatchingConstraints) = - offers.asScala.partition { o => - val offerAttributes = toAttributeMap(o.getAttributesList) - val meetsConstraints = - matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) - - // add some debug messaging - if (!meetsConstraints) { - val id = o.getId.getValue - logDebug(s"Declining offer: $id with attributes: $offerAttributes") - } - - meetsConstraints - } - - // These offers do not meet constraints. We don't need to see them again. - // Decline the offer for a long period of time. - offersNotMatchingConstraints.foreach { o => - d.declineOffer(o.getId, Filters.newBuilder() - .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()) - } - - // Of the matching constraints, see which ones give us enough memory and cores - val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o => - val mem = getResource(o.getResourcesList, "mem") - val cpus = getResource(o.getResourcesList, "cpus") - val slaveId = o.getSlaveId.getValue - val offerAttributes = toAttributeMap(o.getAttributesList) - - // check offers for - // 1. Memory requirements - // 2. CPU requirements - need at least 1 for executor, 1 for task - val meetsMemoryRequirements = mem >= executorMemory(sc) - val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) - val meetsRequirements = - (meetsMemoryRequirements && meetsCPURequirements) || - (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) - val debugstr = if (meetsRequirements) "Accepting" else "Declining" - logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: " - + s"$offerAttributes mem: $mem cpu: $cpus") - - meetsRequirements - } - - // Decline offers we ruled out immediately - unUsableOffers.foreach(o => d.declineOffer(o.getId)) - - val workerOffers = usableOffers.map { o => - val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) { - getResource(o.getResourcesList, "cpus").toInt - } else { - // If the Mesos executor has not been started on this slave yet, set aside a few - // cores for the Mesos executor by offering fewer cores to the Spark executor - (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt - } - new WorkerOffer( - o.getSlaveId.getValue, - o.getHostname, - cpus) - } - - val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap - val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap - val slaveIdToResources = new HashMap[String, JList[Resource]]() - usableOffers.foreach { o => - slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList - } - - val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] - - val slavesIdsOfAcceptedOffers = HashSet[String]() - - // Call into the TaskSchedulerImpl - val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty) - acceptedOffers - .foreach { offer => - offer.foreach { taskDesc => - val slaveId = taskDesc.executorId - slavesIdsOfAcceptedOffers += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - val (mesosTask, remainingResources) = createMesosTask( - taskDesc, - slaveIdToResources(slaveId), - slaveId) - mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) - .add(mesosTask) - slaveIdToResources(slaveId) = remainingResources - } - } - - // Reply to the offers - val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - - mesosTasks.foreach { case (slaveId, tasks) => - slaveIdToWorkerOffer.get(slaveId).foreach(o => - listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, - // TODO: Add support for log urls for Mesos - new ExecutorInfo(o.host, o.cores, Map.empty))) - ) - logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}") - d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) - } - - // Decline offers that weren't used - // NOTE: This logic assumes that we only get a single offer for each host in a given batch - for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) { - d.declineOffer(o.getId) - } - } - } - - /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */ - def createMesosTask( - task: TaskDescription, - resources: JList[Resource], - slaveId: String): (MesosTaskInfo, JList[Resource]) = { - val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() - val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) { - (slaveIdToExecutorInfo(slaveId), resources) - } else { - createExecutorInfo(resources, slaveId) - } - slaveIdToExecutorInfo(slaveId) = executorInfo - val (finalResources, cpuResources) = - partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK) - val taskInfo = MesosTaskInfo.newBuilder() - .setTaskId(taskId) - .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) - .setExecutor(executorInfo) - .setName(task.name) - .addAllResources(cpuResources.asJava) - .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString) - .build() - (taskInfo, finalResources.asJava) - } - - override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) { - inClassLoader() { - val tid = status.getTaskId.getValue.toLong - val state = TaskState.fromMesos(status.getState) - synchronized { - if (TaskState.isFailed(TaskState.fromMesos(status.getState)) - && taskIdToSlaveId.contains(tid)) { - // We lost the executor on this slave, so remember that it's gone - removeExecutor(taskIdToSlaveId(tid), "Lost executor") - } - if (TaskState.isFinished(state)) { - taskIdToSlaveId.remove(tid) - } - } - scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) - } - } - - override def error(d: org.apache.mesos.SchedulerDriver, message: String) { - inClassLoader() { - logError("Mesos error: " + message) - markErr() - scheduler.error(message) - } - } - - override def stop() { - if (mesosDriver != null) { - mesosDriver.stop() - } - } - - override def reviveOffers() { - mesosDriver.reviveOffers() - } - - override def frameworkMessage( - d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - - /** - * Remove executor associated with slaveId in a thread safe manner. - */ - private def removeExecutor(slaveId: String, reason: String) = { - synchronized { - listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason)) - slaveIdToExecutorInfo -= slaveId - } - } - - private def recordSlaveLost( - d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { - inClassLoader() { - logInfo("Mesos slave lost: " + slaveId.getValue) - removeExecutor(slaveId.getValue, reason.toString) - scheduler.executorLost(slaveId.getValue, reason) - } - } - - override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID) { - recordSlaveLost(d, slaveId, SlaveLost()) - } - - override def executorLost( - d: org.apache.mesos.SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) { - logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, - slaveId.getValue)) - recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true)) - } - - override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = { - mesosDriver.killTask( - TaskID.newBuilder() - .setValue(taskId.toString).build() - ) - } - - // TODO: query Mesos for number of cores - override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8) - - override def applicationId(): String = - Option(appId).getOrElse { - logWarning("Application ID is not initialized yet.") - super.applicationId - } - -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala deleted file mode 100644 index 3fe06743b8..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import org.apache.mesos.Protos.{ContainerInfo, Image, Volume} -import org.apache.mesos.Protos.ContainerInfo.DockerInfo - -import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.internal.Logging - -/** - * A collection of utility functions which can be used by both the - * MesosSchedulerBackend and the [[MesosFineGrainedSchedulerBackend]]. - */ -private[mesos] object MesosSchedulerBackendUtil extends Logging { - /** - * Parse a comma-delimited list of volume specs, each of which - * takes the form [host-dir:]container-dir[:rw|:ro]. - */ - def parseVolumesSpec(volumes: String): List[Volume] = { - volumes.split(",").map(_.split(":")).flatMap { spec => - val vol: Volume.Builder = Volume - .newBuilder() - .setMode(Volume.Mode.RW) - spec match { - case Array(container_path) => - Some(vol.setContainerPath(container_path)) - case Array(container_path, "rw") => - Some(vol.setContainerPath(container_path)) - case Array(container_path, "ro") => - Some(vol.setContainerPath(container_path) - .setMode(Volume.Mode.RO)) - case Array(host_path, container_path) => - Some(vol.setContainerPath(container_path) - .setHostPath(host_path)) - case Array(host_path, container_path, "rw") => - Some(vol.setContainerPath(container_path) - .setHostPath(host_path)) - case Array(host_path, container_path, "ro") => - Some(vol.setContainerPath(container_path) - .setHostPath(host_path) - .setMode(Volume.Mode.RO)) - case spec => - logWarning(s"Unable to parse volume specs: $volumes. " - + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"") - None - } - } - .map { _.build() } - .toList - } - - /** - * Parse a comma-delimited list of port mapping specs, each of which - * takes the form host_port:container_port[:udp|:tcp] - * - * Note: - * the docker form is [ip:]host_port:container_port, but the DockerInfo - * message has no field for 'ip', and instead has a 'protocol' field. - * Docker itself only appears to support TCP, so this alternative form - * anticipates the expansion of the docker form to allow for a protocol - * and leaves open the chance for mesos to begin to accept an 'ip' field - */ - def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = { - portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] => - val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping - .newBuilder() - .setProtocol("tcp") - spec match { - case Array(host_port, container_port) => - Some(portmap.setHostPort(host_port.toInt) - .setContainerPort(container_port.toInt)) - case Array(host_port, container_port, protocol) => - Some(portmap.setHostPort(host_port.toInt) - .setContainerPort(container_port.toInt) - .setProtocol(protocol)) - case spec => - logWarning(s"Unable to parse port mapping specs: $portmaps. " - + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"") - None - } - } - .map { _.build() } - .toList - } - - /** - * Construct a DockerInfo structure and insert it into a ContainerInfo - */ - def addDockerInfo( - container: ContainerInfo.Builder, - image: String, - containerizer: String, - forcePullImage: Boolean = false, - volumes: Option[List[Volume]] = None, - portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = { - - containerizer match { - case "docker" => - container.setType(ContainerInfo.Type.DOCKER) - val docker = ContainerInfo.DockerInfo.newBuilder() - .setImage(image) - .setForcePullImage(forcePullImage) - // TODO (mgummelt): Remove this. Portmaps have no effect, - // as we don't support bridge networking. - portmaps.foreach(_.foreach(docker.addPortMappings)) - container.setDocker(docker) - case "mesos" => - container.setType(ContainerInfo.Type.MESOS) - val imageProto = Image.newBuilder() - .setType(Image.Type.DOCKER) - .setDocker(Image.Docker.newBuilder().setName(image)) - .setCached(!forcePullImage) - container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto)) - case _ => - throw new SparkException( - "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}") - } - - volumes.foreach(_.foreach(container.addVolumes)) - } - - /** - * Setup a docker containerizer from MesosDriverDescription scheduler properties - */ - def setupContainerBuilderDockerInfo( - imageName: String, - conf: SparkConf, - builder: ContainerInfo.Builder): Unit = { - val forcePullImage = conf - .getOption("spark.mesos.executor.docker.forcePullImage") - .exists(_.equals("true")) - val volumes = conf - .getOption("spark.mesos.executor.docker.volumes") - .map(parseVolumesSpec) - val portmaps = conf - .getOption("spark.mesos.executor.docker.portmaps") - .map(parsePortMappingsSpec) - - val containerizer = conf.get("spark.mesos.containerizer", "docker") - addDockerInfo( - builder, - imageName, - containerizer, - forcePullImage = forcePullImage, - volumes = volumes, - portmaps = portmaps) - logDebug("setupContainerDockerInfo: using docker image: " + imageName) - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala deleted file mode 100644 index 1bbede1853..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ /dev/null @@ -1,494 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.util.{List => JList} -import java.util.concurrent.CountDownLatch - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.util.control.NonFatal - -import com.google.common.base.Splitter -import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver} -import org.apache.mesos.Protos._ -import org.apache.mesos.protobuf.{ByteString, GeneratedMessage} - -import org.apache.spark.{SparkConf, SparkContext, SparkException} -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - - -/** - * Shared trait for implementing a Mesos Scheduler. This holds common state and helper - * methods and Mesos scheduler will use. - */ -private[mesos] trait MesosSchedulerUtils extends Logging { - // Lock used to wait for scheduler to be registered - private final val registerLatch = new CountDownLatch(1) - - // Driver for talking to Mesos - protected var mesosDriver: SchedulerDriver = null - - /** - * Creates a new MesosSchedulerDriver that communicates to the Mesos master. - * - * @param masterUrl The url to connect to Mesos master - * @param scheduler the scheduler class to receive scheduler callbacks - * @param sparkUser User to impersonate with when running tasks - * @param appName The framework name to display on the Mesos UI - * @param conf Spark configuration - * @param webuiUrl The WebUI url to link from Mesos UI - * @param checkpoint Option to checkpoint tasks for failover - * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect - * @param frameworkId The id of the new framework - */ - protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = { - val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName) - val credBuilder = Credential.newBuilder() - webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) } - checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) } - failoverTimeout.foreach { timeout => fwInfoBuilder.setFailoverTimeout(timeout) } - frameworkId.foreach { id => - fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build()) - } - conf.getOption("spark.mesos.principal").foreach { principal => - fwInfoBuilder.setPrincipal(principal) - credBuilder.setPrincipal(principal) - } - conf.getOption("spark.mesos.secret").foreach { secret => - credBuilder.setSecret(secret) - } - if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) { - throw new SparkException( - "spark.mesos.principal must be configured when spark.mesos.secret is set") - } - conf.getOption("spark.mesos.role").foreach { role => - fwInfoBuilder.setRole(role) - } - if (credBuilder.hasPrincipal) { - new MesosSchedulerDriver( - scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build()) - } else { - new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl) - } - } - - /** - * Starts the MesosSchedulerDriver and stores the current running driver to this new instance. - * This driver is expected to not be running. - * This method returns only after the scheduler has registered with Mesos. - */ - def startScheduler(newDriver: SchedulerDriver): Unit = { - synchronized { - if (mesosDriver != null) { - registerLatch.await() - return - } - @volatile - var error: Option[Exception] = None - - // We create a new thread that will block inside `mesosDriver.run` - // until the scheduler exists - new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") { - setDaemon(true) - override def run() { - try { - mesosDriver = newDriver - val ret = mesosDriver.run() - logInfo("driver.run() returned with code " + ret) - if (ret != null && ret.equals(Status.DRIVER_ABORTED)) { - error = Some(new SparkException("Error starting driver, DRIVER_ABORTED")) - markErr() - } - } catch { - case e: Exception => - logError("driver.run() failed", e) - error = Some(e) - markErr() - } - } - }.start() - - registerLatch.await() - - // propagate any error to the calling thread. This ensures that SparkContext creation fails - // without leaving a broken context that won't be able to schedule any tasks - error.foreach(throw _) - } - } - - def getResource(res: JList[Resource], name: String): Double = { - // A resource can have multiple values in the offer since it can either be from - // a specific role or wildcard. - res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum - } - - /** - * Transforms a range resource to a list of ranges - * - * @param res the mesos resource list - * @param name the name of the resource - * @return the list of ranges returned - */ - protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = { - // A resource can have multiple values in the offer since it can either be from - // a specific role or wildcard. - res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala - .map(r => (r.getBegin, r.getEnd)).toList).toList - } - - /** - * Signal that the scheduler has registered with Mesos. - */ - protected def markRegistered(): Unit = { - registerLatch.countDown() - } - - protected def markErr(): Unit = { - registerLatch.countDown() - } - - def createResource(name: String, amount: Double, role: Option[String] = None): Resource = { - val builder = Resource.newBuilder() - .setName(name) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(amount).build()) - - role.foreach { r => builder.setRole(r) } - - builder.build() - } - - /** - * Partition the existing set of resources into two groups, those remaining to be - * scheduled and those requested to be used for a new task. - * - * @param resources The full list of available resources - * @param resourceName The name of the resource to take from the available resources - * @param amountToUse The amount of resources to take from the available resources - * @return The remaining resources list and the used resources list. - */ - def partitionResources( - resources: JList[Resource], - resourceName: String, - amountToUse: Double): (List[Resource], List[Resource]) = { - var remain = amountToUse - var requestedResources = new ArrayBuffer[Resource] - val remainingResources = resources.asScala.map { - case r => - if (remain > 0 && - r.getType == Value.Type.SCALAR && - r.getScalar.getValue > 0.0 && - r.getName == resourceName) { - val usage = Math.min(remain, r.getScalar.getValue) - requestedResources += createResource(resourceName, usage, Some(r.getRole)) - remain -= usage - createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole)) - } else { - r - } - } - - // Filter any resource that has depleted. - val filteredResources = - remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0) - - (filteredResources.toList, requestedResources.toList) - } - - /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ - protected def getAttribute(attr: Attribute): (String, Set[String]) = { - (attr.getName, attr.getText.getValue.split(',').toSet) - } - - - /** Build a Mesos resource protobuf object */ - protected def createResource(resourceName: String, quantity: Double): Protos.Resource = { - Resource.newBuilder() - .setName(resourceName) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) - .build() - } - - /** - * Converts the attributes from the resource offer into a Map of name -> Attribute Value - * The attribute values are the mesos attribute types and they are - * - * @param offerAttributes the attributes offered - * @return - */ - protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { - offerAttributes.asScala.map { attr => - val attrValue = attr.getType match { - case Value.Type.SCALAR => attr.getScalar - case Value.Type.RANGES => attr.getRanges - case Value.Type.SET => attr.getSet - case Value.Type.TEXT => attr.getText - } - (attr.getName, attrValue) - }.toMap - } - - - /** - * Match the requirements (if any) to the offer attributes. - * if attribute requirements are not specified - return true - * else if attribute is defined and no values are given, simple attribute presence is performed - * else if attribute name and value is specified, subset match is performed on slave attributes - */ - def matchesAttributeRequirements( - slaveOfferConstraints: Map[String, Set[String]], - offerAttributes: Map[String, GeneratedMessage]): Boolean = { - slaveOfferConstraints.forall { - // offer has the required attribute and subsumes the required values for that attribute - case (name, requiredValues) => - offerAttributes.get(name) match { - case None => false - case Some(_) if requiredValues.isEmpty => true // empty value matches presence - case Some(scalarValue: Value.Scalar) => - // check if provided values is less than equal to the offered values - requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue) - case Some(rangeValue: Value.Range) => - val offerRange = rangeValue.getBegin to rangeValue.getEnd - // Check if there is some required value that is between the ranges specified - // Note: We only support the ability to specify discrete values, in the future - // we may expand it to subsume ranges specified with a XX..YY value or something - // similar to that. - requiredValues.map(_.toLong).exists(offerRange.contains(_)) - case Some(offeredValue: Value.Set) => - // check if the specified required values is a subset of offered set - requiredValues.subsetOf(offeredValue.getItemList.asScala.toSet) - case Some(textValue: Value.Text) => - // check if the specified value is equal, if multiple values are specified - // we succeed if any of them match. - requiredValues.contains(textValue.getValue) - } - } - } - - /** - * Parses the attributes constraints provided to spark and build a matching data struct: - * Map[<attribute-name>, Set[values-to-match]] - * The constraints are specified as ';' separated key-value pairs where keys and values - * are separated by ':'. The ':' implies equality (for singular values) and "is one of" for - * multiple values (comma separated). For example: - * {{{ - * parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") - * // would result in - * <code> - * Map( - * "os" -> Set("centos7"), - * "zone": -> Set("us-east-1a", "us-east-1b") - * ) - * }}} - * - * Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/ - * https://github.com/apache/mesos/blob/master/src/common/values.cpp - * https://github.com/apache/mesos/blob/master/src/common/attributes.cpp - * - * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated - * by ':') - * @return Map of constraints to match resources offers. - */ - def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = { - /* - Based on mesos docs: - attributes : attribute ( ";" attribute )* - attribute : labelString ":" ( labelString | "," )+ - labelString : [a-zA-Z0-9_/.-] - */ - val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':') - // kv splitter - if (constraintsVal.isEmpty) { - Map() - } else { - try { - splitter.split(constraintsVal).asScala.toMap.mapValues(v => - if (v == null || v.isEmpty) { - Set[String]() - } else { - v.split(',').toSet - } - ) - } catch { - case NonFatal(e) => - throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e) - } - } - } - - // These defaults copied from YARN - private val MEMORY_OVERHEAD_FRACTION = 0.10 - private val MEMORY_OVERHEAD_MINIMUM = 384 - - /** - * Return the amount of memory to allocate to each executor, taking into account - * container overheads. - * - * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value - * @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM - * (whichever is larger) - */ - def executorMemory(sc: SparkContext): Int = { - sc.conf.getInt("spark.mesos.executor.memoryOverhead", - math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + - sc.executorMemory - } - - def setupUris(uris: String, builder: CommandInfo.Builder): Unit = { - uris.split(",").foreach { uri => - builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim())) - } - } - - protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = { - sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") - } - - protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = { - sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") - } - - /** - * Checks executor ports if they are within some range of the offered list of ports ranges, - * - * @param conf the Spark Config - * @param ports the list of ports to check - * @return true if ports are within range false otherwise - */ - protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean = { - - def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = { - ps.exists{case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port } - } - - val portsToCheck = nonZeroPortValuesFromConfig(conf) - val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports)) - // make sure we have enough ports to allocate per offer - val enoughPorts = - ports.map{case (rangeStart, rangeEnd) => rangeEnd - rangeStart + 1}.sum >= portsToCheck.size - enoughPorts && withinRange - } - - /** - * Partitions port resources. - * - * @param requestedPorts non-zero ports to assign - * @param offeredResources the resources offered - * @return resources left, port resources to be used. - */ - def partitionPortResources(requestedPorts: List[Long], offeredResources: List[Resource]) - : (List[Resource], List[Resource]) = { - if (requestedPorts.isEmpty) { - (offeredResources, List[Resource]()) - } else { - // partition port offers - val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources) - - val portsAndRoles = requestedPorts. - map(x => (x, findPortAndGetAssignedRangeRole(x, portResources))) - - val assignedPortResources = createResourcesFromPorts(portsAndRoles) - - // ignore non-assigned port resources, they will be declined implicitly by mesos - // no need for splitting port resources. - (resourcesWithoutPorts, assignedPortResources) - } - } - - val managedPortNames = List("spark.executor.port", "spark.blockManager.port") - - /** - * The values of the non-zero ports to be used by the executor process. - * @param conf the spark config to use - * @return the ono-zero values of the ports - */ - def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = { - managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0) - } - - /** Creates a mesos resource for a specific port number. */ - private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = { - portsAndRoles.flatMap{ case (port, role) => - createMesosPortResource(List((port, port)), Some(role))} - } - - /** Helper to create mesos resources for specific port ranges. */ - private def createMesosPortResource( - ranges: List[(Long, Long)], - role: Option[String] = None): List[Resource] = { - ranges.map { case (rangeStart, rangeEnd) => - val rangeValue = Value.Range.newBuilder() - .setBegin(rangeStart) - .setEnd(rangeEnd) - val builder = Resource.newBuilder() - .setName("ports") - .setType(Value.Type.RANGES) - .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) - role.foreach(r => builder.setRole(r)) - builder.build() - } - } - - /** - * Helper to assign a port to an offered range and get the latter's role - * info to use it later on. - */ - private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource]) - : String = { - - val ranges = portResources. - map(resource => - (resource.getRole, resource.getRanges.getRangeList.asScala - .map(r => (r.getBegin, r.getEnd)).toList)) - - val rangePortRole = ranges - .find { case (role, rangeList) => rangeList - .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}} - // this is safe since we have previously checked about the ranges (see checkPorts method) - rangePortRole.map{ case (role, rangeList) => role}.get - } - - /** Retrieves the port resources from a list of mesos offered resources */ - private def filterPortResources(resources: List[Resource]): (List[Resource], List[Resource]) = { - resources.partition { r => !(r.getType == Value.Type.RANGES && r.getName == "ports") } - } - - /** - * spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver - * submissions with frameworkIDs. However, this causes issues when a driver process launches - * more than one framework (more than one SparkContext(, because they all try to register with - * the same frameworkID. To enforce that only the first driver registers with the configured - * framework ID, the driver calls this method after the first registration. - */ - def unsetFrameworkID(sc: SparkContext) { - sc.conf.remove("spark.mesos.driver.frameworkId") - System.clearProperty("spark.mesos.driver.frameworkId") - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala deleted file mode 100644 index 8370b61145..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.nio.ByteBuffer - -import org.apache.mesos.protobuf.ByteString - -import org.apache.spark.internal.Logging - -/** - * Wrapper for serializing the data sent when launching Mesos tasks. - */ -private[spark] case class MesosTaskLaunchData( - serializedTask: ByteBuffer, - attemptNumber: Int) extends Logging { - - def toByteString: ByteString = { - val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit) - dataBuffer.putInt(attemptNumber) - dataBuffer.put(serializedTask) - dataBuffer.rewind - logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]") - ByteString.copyFrom(dataBuffer) - } -} - -private[spark] object MesosTaskLaunchData extends Logging { - def fromByteString(byteString: ByteString): MesosTaskLaunchData = { - val byteBuffer = byteString.asReadOnlyByteBuffer() - logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]") - val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes - val serializedTask = byteBuffer.slice() // subsequence starting at the current position - MesosTaskLaunchData(serializedTask, attemptNumber) - } -} |