aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
authorMichael Gummelt <mgummelt@mesosphere.io>2017-03-01 00:10:55 +0100
committerSean Owen <sowen@cloudera.com>2017-03-01 00:10:55 +0100
commitca3864d6e090ca3e68a2ef0cf527e6e00c8c4f64 (patch)
tree4c6a890805872cf93dc1aa2bb7df63c1604e1768 /resource-managers
parentbf5987cbe6c9f4a1a91d912ed3a9098111632d1a (diff)
downloadspark-ca3864d6e090ca3e68a2ef0cf527e6e00c8c4f64.tar.gz
spark-ca3864d6e090ca3e68a2ef0cf527e6e00c8c4f64.tar.bz2
spark-ca3864d6e090ca3e68a2ef0cf527e6e00c8c4f64.zip
[SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio on registered cores rather than accepted cores
## What changes were proposed in this pull request? See JIRA ## How was this patch tested? Unit tests, Mesos/Spark integration tests cc skonto susanxhuynh Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #17045 from mgummelt/SPARK-19373-registered-resources.
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala27
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala111
2 files changed, 70 insertions, 68 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index f555072c38..f69c223ab9 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -54,14 +54,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
with org.apache.mesos.Scheduler
with MesosSchedulerUtils {
- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
+ // Blacklist a slave after this many failures
+ private val MAX_SLAVE_FAILURES = 2
- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+ private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
- val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
+ // Maximum number of cores to acquire
+ private val maxCores = maxCoresOption.getOrElse(Int.MaxValue)
- val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+ private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
+
+ private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
@@ -75,10 +78,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
// Cores we have acquired with each Mesos task ID
- val coresByTaskId = new mutable.HashMap[String, Int]
- val gpusByTaskId = new mutable.HashMap[String, Int]
- var totalCoresAcquired = 0
- var totalGpusAcquired = 0
+ private val coresByTaskId = new mutable.HashMap[String, Int]
+ private val gpusByTaskId = new mutable.HashMap[String, Int]
+ private var totalCoresAcquired = 0
+ private var totalGpusAcquired = 0
// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
@@ -108,7 +111,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock
- val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
+ private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
// Offer constraints
private val slaveOfferConstraints =
@@ -139,7 +142,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
securityManager.isAuthenticationEnabled())
}
- var nextMesosTaskId = 0
+ private var nextMesosTaskId = 0
@volatile var appId: String = _
@@ -256,7 +259,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
override def sufficientResourcesRegistered(): Boolean = {
- totalCoresAcquired >= maxCores * minRegisteredRatio
+ totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio
}
override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index cdb3b68489..78346e9744 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -20,9 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import scala.concurrent.Promise
import scala.reflect.ClassTag
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
@@ -37,8 +35,8 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.mesos.Utils._
@@ -304,25 +302,29 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
test("weburi is set in created scheduler driver") {
- setBackend()
+ initializeSparkConf()
+ sc = new SparkContext(sparkConf)
+
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
+
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
val securityManager = mock[SecurityManager]
val backend = new MesosCoarseGrainedSchedulerBackend(
- taskScheduler, sc, "master", securityManager) {
+ taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
- masterUrl: String,
- scheduler: Scheduler,
- sparkUser: String,
- appName: String,
- conf: SparkConf,
- webuiUrl: Option[String] = None,
- checkpoint: Option[Boolean] = None,
- failoverTimeout: Option[Double] = None,
- frameworkId: Option[String] = None): SchedulerDriver = {
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(webuiUrl.isDefined)
assert(webuiUrl.get.equals("http://webui"))
@@ -419,37 +421,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!dockerInfo.getForcePullImage)
}
- test("Do not call removeExecutor() after backend is stopped") {
- setBackend()
-
- // launches a task on a valid offer
- val offers = List(Resources(backend.executorMemory(sc), 1))
- offerResources(offers)
- verifyTaskLaunched(driver, "o1")
-
- // launches a thread simulating status update
- val statusUpdateThread = new Thread {
- override def run(): Unit = {
- while (!stopCalled) {
- Thread.sleep(100)
- }
-
- val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
- backend.statusUpdate(driver, status)
- }
- }.start
-
- backend.stop()
- // Any method of the backend involving sending messages to the driver endpoint should not
- // be called after the backend is stopped.
- verify(driverEndpoint, never()).askSync(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
- }
-
test("mesos supports spark.executor.uri") {
val url = "spark.spark.spark.com"
setBackend(Map(
"spark.executor.uri" -> url
- ), false)
+ ), null)
val (mem, cpu) = (backend.executorMemory(sc), 4)
@@ -465,7 +441,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "true",
"spark.executor.uri" -> url
- ), false)
+ ), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
@@ -479,7 +455,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "false",
"spark.executor.uri" -> url
- ), false)
+ ), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
@@ -504,8 +480,31 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(networkInfos.get(0).getName == "test-network-name")
}
+ test("supports spark.scheduler.minRegisteredResourcesRatio") {
+ val expectedCores = 1
+ setBackend(Map(
+ "spark.cores.max" -> expectedCores.toString,
+ "spark.scheduler.minRegisteredResourcesRatio" -> "1.0"))
+
+ val offers = List(Resources(backend.executorMemory(sc), expectedCores))
+ offerResources(offers)
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ assert(!backend.isReady)
+
+ registerMockExecutor(launchedTasks(0).getTaskId.getValue, "s1", expectedCores)
+ assert(backend.isReady)
+ }
+
private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
+ private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = {
+ val mockEndpointRef = mock[RpcEndpointRef]
+ val mockAddress = mock[RpcAddress]
+ val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty)
+
+ backend.driverEndpoint.askSync[Boolean](message)
+ }
+
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
@@ -534,8 +533,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private def createSchedulerBackend(
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver,
- shuffleClient: MesosExternalShuffleClient,
- endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
+ shuffleClient: MesosExternalShuffleClient) = {
val securityManager = mock[SecurityManager]
val backend = new MesosCoarseGrainedSchedulerBackend(
@@ -553,9 +551,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
- override protected def createDriverEndpointRef(
- properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
-
// override to avoid race condition with the driver thread on `mesosDriver`
override def startScheduler(newDriver: SchedulerDriver): Unit = {
mesosDriver = newDriver
@@ -571,31 +566,35 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend
}
- private def setBackend(sparkConfVars: Map[String, String] = null,
- setHome: Boolean = true) {
+ private def initializeSparkConf(
+ sparkConfVars: Map[String, String] = null,
+ home: String = "/path"): Unit = {
sparkConf = (new SparkConf)
.setMaster("local[*]")
.setAppName("test-mesos-dynamic-alloc")
.set("spark.mesos.driver.webui.url", "http://webui")
- if (setHome) {
- sparkConf.setSparkHome("/path")
+ if (home != null) {
+ sparkConf.setSparkHome(home)
}
if (sparkConfVars != null) {
sparkConf.setAll(sparkConfVars)
}
+ }
+ private def setBackend(sparkConfVars: Map[String, String] = null, home: String = "/path") {
+ initializeSparkConf(sparkConfVars, home)
sc = new SparkContext(sparkConf)
driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
+
externalShuffleClient = mock[MesosExternalShuffleClient]
- driverEndpoint = mock[RpcEndpointRef]
- when(driverEndpoint.ask(any())(any())).thenReturn(Promise().future)
- backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
+ backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient)
}
}