aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Gummelt <mgummelt@mesosphere.io>2016-07-29 05:50:47 -0700
committerSean Owen <sowen@cloudera.com>2016-07-29 05:50:47 -0700
commit266b92faffb66af24d8ed2725beb80770a2d91f8 (patch)
tree408d3ccb12dcb41ae577600eb1fadaebe79f5482
parent04a2c072d94874f3f7ae9dd94c026e8826a75ccd (diff)
downloadspark-266b92faffb66af24d8ed2725beb80770a2d91f8.tar.gz
spark-266b92faffb66af24d8ed2725beb80770a2d91f8.tar.bz2
spark-266b92faffb66af24d8ed2725beb80770a2d91f8.zip
[SPARK-16637] Unified containerizer
## What changes were proposed in this pull request? New config var: spark.mesos.docker.containerizer={"mesos","docker" (default)} This adds support for running docker containers via the Mesos unified containerizer: http://mesos.apache.org/documentation/latest/container-image/ The benefit is losing the dependency on `dockerd`, and all the costs which it incurs. I've also updated the supported Mesos version to 0.28.2 for support of the required protobufs. This is blocked on: https://github.com/apache/spark/pull/14167 ## How was this patch tested? - manually testing jobs submitted with both "mesos" and "docker" settings for the new config var. - spark/mesos integration test suite Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #14275 from mgummelt/unified-containerizer.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/TaskState.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala85
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala32
-rw-r--r--dev/deps/spark-deps-hadoop-2.22
-rw-r--r--dev/deps/spark-deps-hadoop-2.32
-rw-r--r--dev/deps/spark-deps-hadoop-2.42
-rw-r--r--dev/deps/spark-deps-hadoop-2.62
-rw-r--r--dev/deps/spark-deps-hadoop-2.72
-rw-r--r--docs/_config.yml2
-rw-r--r--docs/running-on-mesos.md10
-rw-r--r--pom.xml2
18 files changed, 149 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index f6af9ccc41..b6d244b1a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
*
* @param loadDefaults whether to also load values from Java system properties
*/
-class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
+class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
@@ -370,6 +370,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
}
+ /** Get all parameters that start with `prefix` */
+ def getAllWithPrefix(prefix: String): Array[(String, String)] = {
+ getAll.filter { case (k, v) => k.startsWith(prefix) }
+ .map { case (k, v) => (k.substring(prefix.length), v) }
+ }
+
+
/** Get a parameter as an integer, falling back to a default if not set */
def getInt(key: String, defaultValue: Int): Int = {
getOption(key).map(_.toInt).getOrElse(defaultValue)
@@ -392,9 +399,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
- val prefix = "spark.executorEnv."
- getAll.filter{case (k, v) => k.startsWith(prefix)}
- .map{case (k, v) => (k.substring(prefix.length), v)}
+ getAllWithPrefix("spark.executorEnv.")
}
/**
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
index fe19f07e32..d232fae6b1 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -41,13 +41,11 @@ private[spark] object TaskState extends Enumeration {
}
def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
- case MesosTaskState.TASK_STAGING => LAUNCHING
- case MesosTaskState.TASK_STARTING => LAUNCHING
- case MesosTaskState.TASK_RUNNING => RUNNING
+ case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING
+ case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
case MesosTaskState.TASK_FINISHED => FINISHED
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
- case MesosTaskState.TASK_LOST => LOST
- case MesosTaskState.TASK_ERROR => LOST
+ case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
index 1948226800..d4c7022f00 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.mesos
import java.util.Date
+import org.apache.spark.SparkConf
import org.apache.spark.deploy.Command
import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
@@ -40,12 +41,15 @@ private[spark] class MesosDriverDescription(
val cores: Double,
val supervise: Boolean,
val command: Command,
- val schedulerProperties: Map[String, String],
+ schedulerProperties: Map[String, String],
val submissionId: String,
val submissionDate: Date,
val retryState: Option[MesosClusterRetryState] = None)
extends Serializable {
+ val conf = new SparkConf(false)
+ schedulerProperties.foreach {case (k, v) => conf.set(k, v)}
+
def copy(
name: String = name,
jarUrl: String = jarUrl,
@@ -53,11 +57,12 @@ private[spark] class MesosDriverDescription(
cores: Double = cores,
supervise: Boolean = supervise,
command: Command = command,
- schedulerProperties: Map[String, String] = schedulerProperties,
+ schedulerProperties: SparkConf = conf,
submissionId: String = submissionId,
submissionDate: Date = submissionDate,
retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
- new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,
+
+ new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap,
submissionId, submissionDate, retryState)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
index 807835105e..cd98110ddc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
@@ -50,7 +50,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
val driverDescription = Iterable.apply(driverState.description)
val submissionState = Iterable.apply(driverState.submissionState)
val command = Iterable.apply(driverState.description.command)
- val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties)
+ val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap)
val commandEnv = Iterable.apply(driverState.description.command.environment)
val driverTable =
UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1e9644d06e..ae531e1997 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -353,19 +353,16 @@ private[spark] class MesosClusterScheduler(
}
}
- private def getDriverExecutorURI(desc: MesosDriverDescription) = {
- desc.schedulerProperties.get("spark.executor.uri")
- .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+ private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
+ desc.conf.getOption("spark.executor.uri")
+ .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
}
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
val env = {
- val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+ val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
-
- val prefix = "spark.mesos.driverEnv."
- val driverEnv = desc.schedulerProperties.filterKeys(_.startsWith(prefix))
- .map { case (k, v) => (k.substring(prefix.length), v) }
+ val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
driverEnv ++ executorEnv ++ desc.command.environment
}
@@ -379,8 +376,8 @@ private[spark] class MesosClusterScheduler(
private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
- desc.schedulerProperties.get("spark.mesos.uris"),
- desc.schedulerProperties.get("spark.submit.pyFiles")).flatMap(
+ desc.conf.getOption("spark.mesos.uris"),
+ desc.conf.getOption("spark.submit.pyFiles")).flatMap(
_.map(_.split(",").map(_.trim))
).flatten
@@ -391,7 +388,7 @@ private[spark] class MesosClusterScheduler(
}
private def getDriverCommandValue(desc: MesosDriverDescription): String = {
- val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
+ val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
val executorUri = getDriverExecutorURI(desc)
// Gets the path to run spark-submit, and the path to the Mesos sandbox.
val (executable, sandboxPath) = if (dockerDefined) {
@@ -411,7 +408,7 @@ private[spark] class MesosClusterScheduler(
// Sandbox path points to the parent folder as we chdir into the folderBasename.
(cmdExecutable, "..")
} else {
- val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
+ val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
.orElse(conf.getOption("spark.home"))
.orElse(Option(System.getenv("SPARK_HOME")))
.getOrElse {
@@ -438,7 +435,7 @@ private[spark] class MesosClusterScheduler(
private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
var options = Seq(
- "--name", desc.schedulerProperties("spark.app.name"),
+ "--name", desc.conf.get("spark.app.name"),
"--master", s"mesos://${conf.get("spark.master")}",
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")
@@ -454,19 +451,19 @@ private[spark] class MesosClusterScheduler(
options ++= Seq("--class", desc.command.mainClass)
}
- desc.schedulerProperties.get("spark.executor.memory").map { v =>
+ desc.conf.getOption("spark.executor.memory").foreach { v =>
options ++= Seq("--executor-memory", v)
}
- desc.schedulerProperties.get("spark.cores.max").map { v =>
+ desc.conf.getOption("spark.cores.max").foreach { v =>
options ++= Seq("--total-executor-cores", v)
}
- desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
+ desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
val formattedFiles = pyFiles.split(",")
.map { path => new File(sandboxPath, path.split("/").last).toString() }
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
- desc.schedulerProperties
+ desc.conf.getAll
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
.foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
options
@@ -476,6 +473,7 @@ private[spark] class MesosClusterScheduler(
* Escape args for Unix-like shells, unless already quoted by the user.
* Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
* and http://www.grymoire.com/Unix/Quote.html
+ *
* @param value argument
* @return escaped argument
*/
@@ -498,6 +496,33 @@ private[spark] class MesosClusterScheduler(
}
}
+ private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
+ val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
+
+ val (remainingResources, cpuResourcesToUse) =
+ partitionResources(offer.resources, "cpus", desc.cores)
+ val (finalResources, memResourcesToUse) =
+ partitionResources(remainingResources.asJava, "mem", desc.mem)
+ offer.resources = finalResources.asJava
+
+ val appName = desc.conf.get("spark.app.name")
+ val taskInfo = TaskInfo.newBuilder()
+ .setTaskId(taskId)
+ .setName(s"Driver for ${appName}")
+ .setSlaveId(offer.slaveId)
+ .setCommand(buildDriverCommand(desc))
+ .addAllResources(cpuResourcesToUse.asJava)
+ .addAllResources(memResourcesToUse.asJava)
+
+ desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+ MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
+ desc.conf,
+ taskInfo.getContainerBuilder)
+ }
+
+ taskInfo.build
+ }
+
/**
* This method takes all the possible candidates and attempt to schedule them with Mesos offers.
* Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
@@ -521,32 +546,12 @@ private[spark] class MesosClusterScheduler(
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
- val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
- val (remainingResources, cpuResourcesToUse) =
- partitionResources(offer.resources, "cpus", driverCpu)
- val (finalResources, memResourcesToUse) =
- partitionResources(remainingResources.asJava, "mem", driverMem)
- val commandInfo = buildDriverCommand(submission)
- val appName = submission.schedulerProperties("spark.app.name")
- val taskInfo = TaskInfo.newBuilder()
- .setTaskId(taskId)
- .setName(s"Driver for $appName")
- .setSlaveId(offer.slaveId)
- .setCommand(commandInfo)
- .addAllResources(cpuResourcesToUse.asJava)
- .addAllResources(memResourcesToUse.asJava)
- offer.resources = finalResources.asJava
- submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
- MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
- image,
- submission.schedulerProperties.get,
- taskInfo.getContainerBuilder())
- }
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
- queuedTasks += taskInfo.build()
+ val task = createTaskInfo(submission, offer)
+ queuedTasks += task
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
- val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
+ val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
None, new Date(), None)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 52993caad1..959d6fd46d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -410,7 +410,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
- sc.conf.getOption,
+ sc.conf,
taskBuilder.getContainerBuilder
)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 8d4fc9eed7..d8d661da31 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -153,7 +153,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
- sc.conf.getOption,
+ sc.conf,
executorInfo.getContainerBuilder()
)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index aa669f01bd..3fe06743b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -17,9 +17,10 @@
package org.apache.spark.scheduler.cluster.mesos
-import org.apache.mesos.Protos.{ContainerInfo, Volume}
+import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
/**
@@ -104,19 +105,33 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
+ containerizer: String,
forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
- network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
- val docker = ContainerInfo.DockerInfo.newBuilder()
- .setImage(image)
- .setForcePullImage(forcePullImage)
+ containerizer match {
+ case "docker" =>
+ container.setType(ContainerInfo.Type.DOCKER)
+ val docker = ContainerInfo.DockerInfo.newBuilder()
+ .setImage(image)
+ .setForcePullImage(forcePullImage)
+ // TODO (mgummelt): Remove this. Portmaps have no effect,
+ // as we don't support bridge networking.
+ portmaps.foreach(_.foreach(docker.addPortMappings))
+ container.setDocker(docker)
+ case "mesos" =>
+ container.setType(ContainerInfo.Type.MESOS)
+ val imageProto = Image.newBuilder()
+ .setType(Image.Type.DOCKER)
+ .setDocker(Image.Docker.newBuilder().setName(image))
+ .setCached(!forcePullImage)
+ container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
+ case _ =>
+ throw new SparkException(
+ "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
+ }
- network.foreach(docker.setNetwork)
- portmaps.foreach(_.foreach(docker.addPortMappings))
- container.setType(ContainerInfo.Type.DOCKER)
- container.setDocker(docker.build())
volumes.foreach(_.foreach(container.addVolumes))
}
@@ -125,18 +140,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
*/
def setupContainerBuilderDockerInfo(
imageName: String,
- conf: String => Option[String],
+ conf: SparkConf,
builder: ContainerInfo.Builder): Unit = {
- val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
+ val forcePullImage = conf
+ .getOption("spark.mesos.executor.docker.forcePullImage")
.exists(_.equals("true"))
- val volumes = conf("spark.mesos.executor.docker.volumes")
+ val volumes = conf
+ .getOption("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
- val portmaps = conf("spark.mesos.executor.docker.portmaps")
+ val portmaps = conf
+ .getOption("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)
+ val containerizer = conf.get("spark.mesos.containerizer", "docker")
addDockerInfo(
builder,
imageName,
+ containerizer,
forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 7355ba317d..cd4b45f8de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
+
/**
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
* methods and Mesos scheduler will use.
@@ -79,7 +80,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
credBuilder.setPrincipal(principal)
}
conf.getOption("spark.mesos.secret").foreach { secret =>
- credBuilder.setSecret(ByteString.copyFromUtf8(secret))
+ credBuilder.setSecret(secret)
}
if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
throw new SparkException(
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 51d262e75e..a74fdf79a1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -109,7 +109,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
- val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
+ val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
assert(cpus == executorCores)
}
@@ -123,7 +123,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
- val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
+ val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
assert(cpus == offerCores)
}
@@ -137,7 +137,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
- val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
+ val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
assert(cpus == maxCores)
}
@@ -252,6 +252,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}
+ test("honors unset spark.mesos.containerizer") {
+ setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER)
+ }
+
+ test("honors spark.mesos.containerizer=\"mesos\"") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "test",
+ "spark.mesos.containerizer" -> "mesos"))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS)
+ }
+
test("docker settings are reflected in created tasks") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image",
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 9350b9df50..d0771e1ac8 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -116,7 +116,7 @@ libfb303-0.9.2.jar
libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 2e1a6a3dc6..ef97ffd9ab 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 9baf87e532..fba3c18b14 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 9112452b5c..9747acda81 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -130,7 +130,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index b0e3e9304b..7231bcaf6c 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -131,7 +131,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
diff --git a/docs/_config.yml b/docs/_config.yml
index bbb576e0e7..e4fc093fe7 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
SPARK_VERSION_SHORT: 2.1.0
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.7"
-MESOS_VERSION: 0.22.0
+MESOS_VERSION: 1.0.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index ce888b5445..d037e7be0a 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -433,6 +433,16 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
+ <td><code>spark.mesos.containerizer</code></td>
+ <td><code>docker</code></td>
+ <td>
+ This only affects docker containers, and must be one of "docker"
+ or "mesos". Mesos supports two types of
+ containerizers for docker: the "docker" containerizer, and the preferred
+ "mesos" containerizer. Read more here: http://mesos.apache.org/documentation/latest/container-image/
+ </td>
+</tr>
+<tr>
<td><code>spark.mesos.driver.webui.url</code></td>
<td><code>(none)</code></td>
<td>
diff --git a/pom.xml b/pom.xml
index 9b7be371bb..0491e981d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
<java.version>1.7</java.version>
<maven.version>3.3.9</maven.version>
<sbt.project.name>spark</sbt.project.name>
- <mesos.version>0.22.2</mesos.version>
+ <mesos.version>1.0.0</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>