aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPhilipp Hoffmann <mail@philipphoffmann.de>2016-07-25 20:14:47 +0100
committerSean Owen <sowen@cloudera.com>2016-07-25 20:14:47 +0100
commit978cd5f125eb5a410bad2e60bf8385b11cf1b978 (patch)
treec33a44dc7487259409a64eb515ae6c6e002c56fb /core
parentdd784a8822497ad0631208d56325c4d74ab9e036 (diff)
downloadspark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.tar.gz
spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.tar.bz2
spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.zip
[SPARK-15271][MESOS] Allow force pulling executor docker images
## What changes were proposed in this pull request? Mesos agents by default will not pull docker images which are cached locally already. In order to run Spark executors from mutable tags like `:latest` this commit introduces a Spark setting `spark.mesos.executor.docker.forcePullImage`. Setting this flag to true will tell the Mesos agent to force pull the docker image (default is `false` which is consistent with the previous implementation and Mesos' default behaviour). ## How was this patch tested? I ran a sample application including this change on a Mesos cluster and verified the correct behaviour for both, with and without, force pulling the executor image. As expected the image is being force pulled if the flag is set. Author: Philipp Hoffmann <mail@philipphoffmann.de> Closes #13051 from philipphoffmann/force-pull-image.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala63
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala2
6 files changed, 91 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 39b0f4d0e2..1e9644d06e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler(
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
- val container = taskInfo.getContainerBuilder()
- val volumes = submission.schedulerProperties
- .get("spark.mesos.executor.docker.volumes")
- .map(MesosSchedulerBackendUtil.parseVolumesSpec)
- val portmaps = submission.schedulerProperties
- .get("spark.mesos.executor.docker.portmaps")
- .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
- MesosSchedulerBackendUtil.addDockerInfo(
- container, image, volumes = volumes, portmaps = portmaps)
- taskInfo.setContainer(container.build())
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ submission.schedulerProperties.get,
+ taskInfo.getContainerBuilder())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 99e6d39583..52993caad1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.addAllResources(memResourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ sc.conf.getOption,
+ taskBuilder.getContainerBuilder
+ )
}
tasks(offer.getId) ::= taskBuilder.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index e08dc3b595..8d4fc9eed7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setData(ByteString.copyFrom(createExecArg()))
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ sc.conf.getOption,
+ executorInfo.getContainerBuilder()
+ )
}
(executorInfo.build(), resourcesAfterMem.asJava)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 05b2b08944..aa669f01bd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
-import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
/**
@@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
+ forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
- val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
+ val docker = ContainerInfo.DockerInfo.newBuilder()
+ .setImage(image)
+ .setForcePullImage(forcePullImage)
network.foreach(docker.setNetwork)
portmaps.foreach(_.foreach(docker.addPortMappings))
@@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}
/**
- * Setup a docker containerizer
+ * Setup a docker containerizer from MesosDriverDescription scheduler properties
*/
def setupContainerBuilderDockerInfo(
imageName: String,
- conf: SparkConf,
+ conf: String => Option[String],
builder: ContainerInfo.Builder): Unit = {
- val volumes = conf
- .getOption("spark.mesos.executor.docker.volumes")
+ val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
+ .exists(_.equals("true"))
+ val volumes = conf("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
- val portmaps = conf
- .getOption("spark.mesos.executor.docker.portmaps")
+ val portmaps = conf("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)
+
addDockerInfo(
builder,
imageName,
+ forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
logDebug("setupContainerDockerInfo: using docker image: " + imageName)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index c2779d7b35..d3a85c654e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}
+ test("docker settings are reflected in created tasks") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "some_image",
+ "spark.mesos.executor.docker.forcePullImage" -> "true",
+ "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
+ "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched("o1").asScala
+ assert(launchedTasks.size == 1)
+
+ val containerInfo = launchedTasks.head.getContainer
+ assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+ val volumes = containerInfo.getVolumesList.asScala
+ assert(volumes.size == 1)
+
+ val volume = volumes.head
+ assert(volume.getHostPath == "/host_vol")
+ assert(volume.getContainerPath == "/container_vol")
+ assert(volume.getMode == Volume.Mode.RO)
+
+ val dockerInfo = containerInfo.getDocker
+
+ assert(dockerInfo.getImage == "some_image")
+ assert(dockerInfo.getForcePullImage)
+
+ val portMappings = dockerInfo.getPortMappingsList.asScala
+ assert(portMappings.size == 1)
+
+ val portMapping = portMappings.head
+ assert(portMapping.getHostPort == 8080)
+ assert(portMapping.getContainerPort == 80)
+ assert(portMapping.getProtocol == "tcp")
+ }
+
+ test("force-pull-image option is disabled by default") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "some_image"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched("o1").asScala
+ assert(launchedTasks.size == 1)
+
+ val containerInfo = launchedTasks.head.getContainer
+ assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+ val dockerInfo = containerInfo.getDocker
+
+ assert(dockerInfo.getImage == "some_image")
+ assert(!dockerInfo.getForcePullImage)
+ }
+
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 41693b1191..fcf39f6391 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite
val conf = new SparkConf()
.set("spark.mesos.executor.docker.image", "spark/mock")
+ .set("spark.mesos.executor.docker.forcePullImage", "true")
.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
@@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+ assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
assert(portmaps.get(0).getContainerPort.equals(8080))