aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala63
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala2
-rw-r--r--dev/deps/spark-deps-hadoop-2.22
-rw-r--r--dev/deps/spark-deps-hadoop-2.32
-rw-r--r--dev/deps/spark-deps-hadoop-2.42
-rw-r--r--dev/deps/spark-deps-hadoop-2.62
-rw-r--r--dev/deps/spark-deps-hadoop-2.72
-rw-r--r--docs/_config.yml2
-rw-r--r--docs/running-on-mesos.md12
-rw-r--r--pom.xml2
14 files changed, 110 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 39b0f4d0e2..1e9644d06e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler(
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
- val container = taskInfo.getContainerBuilder()
- val volumes = submission.schedulerProperties
- .get("spark.mesos.executor.docker.volumes")
- .map(MesosSchedulerBackendUtil.parseVolumesSpec)
- val portmaps = submission.schedulerProperties
- .get("spark.mesos.executor.docker.portmaps")
- .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
- MesosSchedulerBackendUtil.addDockerInfo(
- container, image, volumes = volumes, portmaps = portmaps)
- taskInfo.setContainer(container.build())
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ submission.schedulerProperties.get,
+ taskInfo.getContainerBuilder())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 99e6d39583..52993caad1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.addAllResources(memResourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ sc.conf.getOption,
+ taskBuilder.getContainerBuilder
+ )
}
tasks(offer.getId) ::= taskBuilder.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index e08dc3b595..8d4fc9eed7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setData(ByteString.copyFrom(createExecArg()))
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil
- .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
+ image,
+ sc.conf.getOption,
+ executorInfo.getContainerBuilder()
+ )
}
(executorInfo.build(), resourcesAfterMem.asJava)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 05b2b08944..aa669f01bd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
-import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
/**
@@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
+ forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
- val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
+ val docker = ContainerInfo.DockerInfo.newBuilder()
+ .setImage(image)
+ .setForcePullImage(forcePullImage)
network.foreach(docker.setNetwork)
portmaps.foreach(_.foreach(docker.addPortMappings))
@@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}
/**
- * Setup a docker containerizer
+ * Setup a docker containerizer from MesosDriverDescription scheduler properties
*/
def setupContainerBuilderDockerInfo(
imageName: String,
- conf: SparkConf,
+ conf: String => Option[String],
builder: ContainerInfo.Builder): Unit = {
- val volumes = conf
- .getOption("spark.mesos.executor.docker.volumes")
+ val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
+ .exists(_.equals("true"))
+ val volumes = conf("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
- val portmaps = conf
- .getOption("spark.mesos.executor.docker.portmaps")
+ val portmaps = conf("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)
+
addDockerInfo(
builder,
imageName,
+ forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
logDebug("setupContainerDockerInfo: using docker image: " + imageName)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index c2779d7b35..51d262e75e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}
+ test("docker settings are reflected in created tasks") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "some_image",
+ "spark.mesos.executor.docker.forcePullImage" -> "true",
+ "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
+ "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ assert(launchedTasks.size == 1)
+
+ val containerInfo = launchedTasks.head.getContainer
+ assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+ val volumes = containerInfo.getVolumesList.asScala
+ assert(volumes.size == 1)
+
+ val volume = volumes.head
+ assert(volume.getHostPath == "/host_vol")
+ assert(volume.getContainerPath == "/container_vol")
+ assert(volume.getMode == Volume.Mode.RO)
+
+ val dockerInfo = containerInfo.getDocker
+
+ assert(dockerInfo.getImage == "some_image")
+ assert(dockerInfo.getForcePullImage)
+
+ val portMappings = dockerInfo.getPortMappingsList.asScala
+ assert(portMappings.size == 1)
+
+ val portMapping = portMappings.head
+ assert(portMapping.getHostPort == 8080)
+ assert(portMapping.getContainerPort == 80)
+ assert(portMapping.getProtocol == "tcp")
+ }
+
+ test("force-pull-image option is disabled by default") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "some_image"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ assert(launchedTasks.size == 1)
+
+ val containerInfo = launchedTasks.head.getContainer
+ assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+ val dockerInfo = containerInfo.getDocker
+
+ assert(dockerInfo.getImage == "some_image")
+ assert(!dockerInfo.getForcePullImage)
+ }
+
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 41693b1191..fcf39f6391 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite
val conf = new SparkConf()
.set("spark.mesos.executor.docker.image", "spark/mock")
+ .set("spark.mesos.executor.docker.forcePullImage", "true")
.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
@@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+ assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
assert(portmaps.get(0).getContainerPort.equals(8080))
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 5d536b7c24..ff15873140 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -116,7 +116,7 @@ libfb303-0.9.2.jar
libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index d16f42a97d..2b5764f868 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 2e261cb9a5..3f53fdb09c 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 67f38f4c22..d3a7ab8bb4 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -130,7 +130,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 07583963d9..05317a044d 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -131,7 +131,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.21.1-shaded-protobuf.jar
+mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/docs/_config.yml b/docs/_config.yml
index be3d8a2fe6..bbb576e0e7 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
SPARK_VERSION_SHORT: 2.1.0
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.7"
-MESOS_VERSION: 0.21.0
+MESOS_VERSION: 0.22.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 10dc9ce890..ce888b5445 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -260,6 +260,10 @@ have Mesos download Spark via the usual methods.
Requires Mesos version 0.20.1 or later.
+Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image
+tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in order to force the agent to always pull the
+image before running the executor. Force pulling images is only available in Mesos version 0.22 and above.
+
# Running Alongside Hadoop
You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
@@ -335,6 +339,14 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
+ <td><code>spark.mesos.executor.docker.forcePullImage</code></td>
+ <td>false</td>
+ <td>
+ Force Mesos agents to pull the image specified in <code>spark.mesos.executor.docker.image</code>.
+ By default Mesos agents will not pull images they already have cached.
+ </td>
+</tr>
+<tr>
<td><code>spark.mesos.executor.docker.volumes</code></td>
<td>(none)</td>
<td>
diff --git a/pom.xml b/pom.xml
index d064cb57dd..b69292d188 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
<java.version>1.7</java.version>
<maven.version>3.3.9</maven.version>
<sbt.project.name>spark</sbt.project.name>
- <mesos.version>0.21.1</mesos.version>
+ <mesos.version>0.22.2</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>