aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-07-25 12:43:44 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-07-25 12:43:44 -0700
commitfc17121d592acbd7405135cd576bafc5c574650e (patch)
treeea2cd031ad9617c06c44b595eab17f42baf0dbd7 /core/src
parent3b6e1d094e153599e158331b10d33d74a667be5a (diff)
downloadspark-fc17121d592acbd7405135cd576bafc5c574650e.tar.gz
spark-fc17121d592acbd7405135cd576bafc5c574650e.tar.bz2
spark-fc17121d592acbd7405135cd576bafc5c574650e.zip
Revert "[SPARK-15271][MESOS] Allow force pulling executor docker images"
This reverts commit 978cd5f125eb5a410bad2e60bf8385b11cf1b978.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala63
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala2
6 files changed, 22 insertions, 91 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1e9644d06e..39b0f4d0e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -537,10 +537,16 @@ private[spark] class MesosClusterScheduler(
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
- image,
- submission.schedulerProperties.get,
- taskInfo.getContainerBuilder())
+ val container = taskInfo.getContainerBuilder()
+ val volumes = submission.schedulerProperties
+ .get("spark.mesos.executor.docker.volumes")
+ .map(MesosSchedulerBackendUtil.parseVolumesSpec)
+ val portmaps = submission.schedulerProperties
+ .get("spark.mesos.executor.docker.portmaps")
+ .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
+ MesosSchedulerBackendUtil.addDockerInfo(
+ container, image, volumes = volumes, portmaps = portmaps)
+ taskInfo.setContainer(container.build())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 52993caad1..99e6d39583 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -408,11 +408,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.addAllResources(memResourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
- image,
- sc.conf.getOption,
- taskBuilder.getContainerBuilder
- )
+ MesosSchedulerBackendUtil
+ .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
}
tasks(offer.getId) ::= taskBuilder.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 8d4fc9eed7..e08dc3b595 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -151,11 +151,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setData(ByteString.copyFrom(createExecArg()))
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
- image,
- sc.conf.getOption,
- executorInfo.getContainerBuilder()
- )
+ MesosSchedulerBackendUtil
+ .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
}
(executorInfo.build(), resourcesAfterMem.asJava)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index aa669f01bd..05b2b08944 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
+import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
/**
@@ -104,14 +105,11 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
- forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
- val docker = ContainerInfo.DockerInfo.newBuilder()
- .setImage(image)
- .setForcePullImage(forcePullImage)
+ val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
network.foreach(docker.setNetwork)
portmaps.foreach(_.foreach(docker.addPortMappings))
@@ -121,23 +119,21 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}
/**
- * Setup a docker containerizer from MesosDriverDescription scheduler properties
+ * Setup a docker containerizer
*/
def setupContainerBuilderDockerInfo(
imageName: String,
- conf: String => Option[String],
+ conf: SparkConf,
builder: ContainerInfo.Builder): Unit = {
- val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
- .exists(_.equals("true"))
- val volumes = conf("spark.mesos.executor.docker.volumes")
+ val volumes = conf
+ .getOption("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
- val portmaps = conf("spark.mesos.executor.docker.portmaps")
+ val portmaps = conf
+ .getOption("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)
-
addDockerInfo(
builder,
imageName,
- forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
logDebug("setupContainerDockerInfo: using docker image: " + imageName)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index d3a85c654e..c2779d7b35 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -252,69 +252,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}
- test("docker settings are reflected in created tasks") {
- setBackend(Map(
- "spark.mesos.executor.docker.image" -> "some_image",
- "spark.mesos.executor.docker.forcePullImage" -> "true",
- "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
- "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
- ))
-
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer1).asJava)
-
- val launchedTasks = verifyTaskLaunched("o1").asScala
- assert(launchedTasks.size == 1)
-
- val containerInfo = launchedTasks.head.getContainer
- assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
-
- val volumes = containerInfo.getVolumesList.asScala
- assert(volumes.size == 1)
-
- val volume = volumes.head
- assert(volume.getHostPath == "/host_vol")
- assert(volume.getContainerPath == "/container_vol")
- assert(volume.getMode == Volume.Mode.RO)
-
- val dockerInfo = containerInfo.getDocker
-
- assert(dockerInfo.getImage == "some_image")
- assert(dockerInfo.getForcePullImage)
-
- val portMappings = dockerInfo.getPortMappingsList.asScala
- assert(portMappings.size == 1)
-
- val portMapping = portMappings.head
- assert(portMapping.getHostPort == 8080)
- assert(portMapping.getContainerPort == 80)
- assert(portMapping.getProtocol == "tcp")
- }
-
- test("force-pull-image option is disabled by default") {
- setBackend(Map(
- "spark.mesos.executor.docker.image" -> "some_image"
- ))
-
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer1).asJava)
-
- val launchedTasks = verifyTaskLaunched("o1").asScala
- assert(launchedTasks.size == 1)
-
- val containerInfo = launchedTasks.head.getContainer
- assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
-
- val dockerInfo = containerInfo.getDocker
-
- assert(dockerInfo.getImage == "some_image")
- assert(!dockerInfo.getForcePullImage)
- }
-
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index fcf39f6391..41693b1191 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -150,7 +150,6 @@ class MesosFineGrainedSchedulerBackendSuite
val conf = new SparkConf()
.set("spark.mesos.executor.docker.image", "spark/mock")
- .set("spark.mesos.executor.docker.forcePullImage", "true")
.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
@@ -170,7 +169,6 @@ class MesosFineGrainedSchedulerBackendSuite
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
- assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
assert(portmaps.get(0).getContainerPort.equals(8080))