diff options
author | Philipp Hoffmann <mail@philipphoffmann.de> | 2016-07-25 20:14:47 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-07-25 20:14:47 +0100 |
commit | 978cd5f125eb5a410bad2e60bf8385b11cf1b978 (patch) | |
tree | c33a44dc7487259409a64eb515ae6c6e002c56fb /core/src | |
parent | dd784a8822497ad0631208d56325c4d74ab9e036 (diff) | |
download | spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.tar.gz spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.tar.bz2 spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.zip |
[SPARK-15271][MESOS] Allow force pulling executor docker images
## What changes were proposed in this pull request?
Mesos agents by default will not pull docker images which are cached
locally already. In order to run Spark executors from mutable tags like
`:latest` this commit introduces a Spark setting
`spark.mesos.executor.docker.forcePullImage`. Setting this flag to
true will tell the Mesos agent to force pull the docker image (default is `false` which is consistent with the previous
implementation and Mesos' default
behaviour).
## How was this patch tested?
I ran a sample application including this change on a Mesos cluster and verified the correct behaviour for both, with and without, force pulling the executor image. As expected the image is being force pulled if the flag is set.
Author: Philipp Hoffmann <mail@philipphoffmann.de>
Closes #13051 from philipphoffmann/force-pull-image.
Diffstat (limited to 'core/src')
6 files changed, 91 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 39b0f4d0e2..1e9644d06e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler( .addAllResources(memResourcesToUse.asJava) offer.resources = finalResources.asJava submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image => - val container = taskInfo.getContainerBuilder() - val volumes = submission.schedulerProperties - .get("spark.mesos.executor.docker.volumes") - .map(MesosSchedulerBackendUtil.parseVolumesSpec) - val portmaps = submission.schedulerProperties - .get("spark.mesos.executor.docker.portmaps") - .map(MesosSchedulerBackendUtil.parsePortMappingsSpec) - MesosSchedulerBackendUtil.addDockerInfo( - container, image, volumes = volumes, portmaps = portmaps) - taskInfo.setContainer(container.build()) + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + submission.schedulerProperties.get, + taskInfo.getContainerBuilder()) } val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo]) queuedTasks += taskInfo.build() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 99e6d39583..52993caad1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .addAllResources(memResourcesToUse.asJava) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder) + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + sc.conf.getOption, + taskBuilder.getContainerBuilder + ) } tasks(offer.getId) ::= taskBuilder.build() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index e08dc3b595..8d4fc9eed7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setData(ByteString.copyFrom(createExecArg())) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder()) + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + sc.conf.getOption, + executorInfo.getContainerBuilder() + ) } (executorInfo.build(), resourcesAfterMem.asJava) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index 05b2b08944..aa669f01bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos import org.apache.mesos.Protos.{ContainerInfo, Volume} import org.apache.mesos.Protos.ContainerInfo.DockerInfo -import org.apache.spark.SparkConf import org.apache.spark.internal.Logging /** @@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { def addDockerInfo( container: ContainerInfo.Builder, image: String, + forcePullImage: Boolean = false, volumes: Option[List[Volume]] = None, network: Option[ContainerInfo.DockerInfo.Network] = None, portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = { - val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image) + val docker = ContainerInfo.DockerInfo.newBuilder() + .setImage(image) + .setForcePullImage(forcePullImage) network.foreach(docker.setNetwork) portmaps.foreach(_.foreach(docker.addPortMappings)) @@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { } /** - * Setup a docker containerizer + * Setup a docker containerizer from MesosDriverDescription scheduler properties */ def setupContainerBuilderDockerInfo( imageName: String, - conf: SparkConf, + conf: String => Option[String], builder: ContainerInfo.Builder): Unit = { - val volumes = conf - .getOption("spark.mesos.executor.docker.volumes") + val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage") + .exists(_.equals("true")) + val volumes = conf("spark.mesos.executor.docker.volumes") .map(parseVolumesSpec) - val portmaps = conf - .getOption("spark.mesos.executor.docker.portmaps") + val portmaps = conf("spark.mesos.executor.docker.portmaps") .map(parsePortMappingsSpec) + addDockerInfo( builder, imageName, + forcePullImage = forcePullImage, volumes = volumes, portmaps = portmaps) logDebug("setupContainerDockerInfo: using docker image: " + imageName) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index c2779d7b35..d3a85c654e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.start() } + test("docker settings are reflected in created tasks") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image", + "spark.mesos.executor.docker.forcePullImage" -> "true", + "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro", + "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched("o1").asScala + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val volumes = containerInfo.getVolumesList.asScala + assert(volumes.size == 1) + + val volume = volumes.head + assert(volume.getHostPath == "/host_vol") + assert(volume.getContainerPath == "/container_vol") + assert(volume.getMode == Volume.Mode.RO) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(dockerInfo.getForcePullImage) + + val portMappings = dockerInfo.getPortMappingsList.asScala + assert(portMappings.size == 1) + + val portMapping = portMappings.head + assert(portMapping.getHostPort == 8080) + assert(portMapping.getContainerPort == 80) + assert(portMapping.getProtocol == "tcp") + } + + test("force-pull-image option is disabled by default") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched("o1").asScala + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(!dockerInfo.getForcePullImage) + } + private def verifyDeclinedOffer(driver: SchedulerDriver, offerId: OfferID, filter: Boolean = false): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 41693b1191..fcf39f6391 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite val conf = new SparkConf() .set("spark.mesos.executor.docker.image", "spark/mock") + .set("spark.mesos.executor.docker.forcePullImage", "true") .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro") .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp") @@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite val (execInfo, _) = backend.createExecutorInfo( Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor") assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) + assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true)) val portmaps = execInfo.getContainer.getDocker.getPortMappingsList assert(portmaps.get(0).getHostPort.equals(80)) assert(portmaps.get(0).getContainerPort.equals(8080)) |