diff options
author | Philipp Hoffmann <mail@philipphoffmann.de> | 2016-07-25 20:14:47 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-07-25 20:14:47 +0100 |
commit | 978cd5f125eb5a410bad2e60bf8385b11cf1b978 (patch) | |
tree | c33a44dc7487259409a64eb515ae6c6e002c56fb /core/src/test/scala | |
parent | dd784a8822497ad0631208d56325c4d74ab9e036 (diff) | |
download | spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.tar.gz spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.tar.bz2 spark-978cd5f125eb5a410bad2e60bf8385b11cf1b978.zip |
[SPARK-15271][MESOS] Allow force pulling executor docker images
## What changes were proposed in this pull request?
Mesos agents by default will not pull docker images which are cached
locally already. In order to run Spark executors from mutable tags like
`:latest` this commit introduces a Spark setting
`spark.mesos.executor.docker.forcePullImage`. Setting this flag to
true will tell the Mesos agent to force pull the docker image (default is `false` which is consistent with the previous
implementation and Mesos' default
behaviour).
## How was this patch tested?
I ran a sample application including this change on a Mesos cluster and verified the correct behaviour for both, with and without, force pulling the executor image. As expected the image is being force pulled if the flag is set.
Author: Philipp Hoffmann <mail@philipphoffmann.de>
Closes #13051 from philipphoffmann/force-pull-image.
Diffstat (limited to 'core/src/test/scala')
2 files changed, 65 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index c2779d7b35..d3a85c654e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.start() } + test("docker settings are reflected in created tasks") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image", + "spark.mesos.executor.docker.forcePullImage" -> "true", + "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro", + "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched("o1").asScala + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val volumes = containerInfo.getVolumesList.asScala + assert(volumes.size == 1) + + val volume = volumes.head + assert(volume.getHostPath == "/host_vol") + assert(volume.getContainerPath == "/container_vol") + assert(volume.getMode == Volume.Mode.RO) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(dockerInfo.getForcePullImage) + + val portMappings = dockerInfo.getPortMappingsList.asScala + assert(portMappings.size == 1) + + val portMapping = portMappings.head + assert(portMapping.getHostPort == 8080) + assert(portMapping.getContainerPort == 80) + assert(portMapping.getProtocol == "tcp") + } + + test("force-pull-image option is disabled by default") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched("o1").asScala + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(!dockerInfo.getForcePullImage) + } + private def verifyDeclinedOffer(driver: SchedulerDriver, offerId: OfferID, filter: Boolean = false): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 41693b1191..fcf39f6391 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite val conf = new SparkConf() .set("spark.mesos.executor.docker.image", "spark/mock") + .set("spark.mesos.executor.docker.forcePullImage", "true") .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro") .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp") @@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite val (execInfo, _) = backend.createExecutorInfo( Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor") assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) + assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true)) val portmaps = execInfo.getContainer.getDocker.getPortMappingsList assert(portmaps.get(0).getHostPort.equals(80)) assert(portmaps.get(0).getContainerPort.equals(8080)) |