aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala20
4 files changed, 26 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 39b0f4d0e2..1e9644d06e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler(
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
- val container = taskInfo.getContainerBuilder()
- val volumes = submission.schedulerProperties
- .get("spark.mesos.executor.docker.volumes")
- .map(MesosSchedulerBackendUtil.parseVolumesSpec)
- val portmaps = submission.schedulerProperties
- .get("spark.mesos.executor.docker.portmaps")
- .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
- MesosSchedulerBackendUtil.addDockerInfo(
- container, image, volumes = volumes, portmaps = portmaps)
- taskInfo.setContainer(container.build())
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ submission.schedulerProperties.get,
+ taskInfo.getContainerBuilder())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 99e6d39583..52993caad1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.addAllResources(memResourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ sc.conf.getOption,
+ taskBuilder.getContainerBuilder
+ )
}
tasks(offer.getId) ::= taskBuilder.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index e08dc3b595..8d4fc9eed7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setData(ByteString.copyFrom(createExecArg()))
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ sc.conf.getOption,
+ executorInfo.getContainerBuilder()
+ )
}
(executorInfo.build(), resourcesAfterMem.asJava)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 05b2b08944..aa669f01bd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
-import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
/**
@@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
+ forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
- val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
+ val docker = ContainerInfo.DockerInfo.newBuilder()
+ .setImage(image)
+ .setForcePullImage(forcePullImage)
network.foreach(docker.setNetwork)
portmaps.foreach(_.foreach(docker.addPortMappings))
@@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}
/**
- * Setup a docker containerizer
+ * Setup a docker containerizer from MesosDriverDescription scheduler properties
*/
def setupContainerBuilderDockerInfo(
imageName: String,
- conf: SparkConf,
+ conf: String => Option[String],
builder: ContainerInfo.Builder): Unit = {
- val volumes = conf
- .getOption("spark.mesos.executor.docker.volumes")
+ val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
+ .exists(_.equals("true"))
+ val volumes = conf("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
- val portmaps = conf
- .getOption("spark.mesos.executor.docker.portmaps")
+ val portmaps = conf("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)
+
addDockerInfo(
builder,
imageName,
+ forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
logDebug("setupContainerDockerInfo: using docker image: " + imageName)