aboutsummaryrefslogtreecommitdiff
path: root/mesos
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-09-21 14:42:41 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-21 14:42:41 -0700
commit2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42 (patch)
treef0734e443661a59da6b14d496aeacd16809ecb86 /mesos
parentb4a4421b610e776e5280fd5e7453f937f806cbd1 (diff)
downloadspark-2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42.tar.gz
spark-2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42.tar.bz2
spark-2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42.zip
[SPARK-4563][CORE] Allow driver to advertise a different network address.
The goal of this feature is to allow the Spark driver to run in an isolated environment, such as a docker container, and be able to use the host's port forwarding mechanism to be able to accept connections from the outside world. The change is restricted to the driver: there is no support for achieving the same thing on executors (or the YARN AM for that matter). Those still need full access to the outside world so that, for example, connections can be made to an executor's block manager. The core of the change is simple: add a new configuration that tells what's the address the driver should bind to, which can be different than the address it advertises to executors (spark.driver.host). Everything else is plumbing the new configuration where it's needed. To use the feature, the host starting the container needs to set up the driver's port range to fall into a range that is being forwarded; this required the block manager port to need a special configuration just for the driver, which falls back to the existing spark.blockManager.port when not set. This way, users can modify the driver settings without affecting the executors; it would theoretically be nice to also have different retry counts for driver and executors, but given that docker (at least) allows forwarding port ranges, we can probably live without that for now. Because of the nature of the feature it's kinda hard to add unit tests; I just added a simple one to make sure the configuration works. This was tested with a docker image running spark-shell with the following command: docker blah blah blah \ -p 38000-38100:38000-38100 \ [image] \ spark-shell \ --num-executors 3 \ --conf spark.shuffle.service.enabled=false \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.driver.host=[host's address] \ --conf spark.driver.port=38000 \ --conf spark.driver.blockManager.port=38020 \ --conf spark.ui.port=38040 Running on YARN; verified the driver works, executors start up and listen on ephemeral ports (instead of using the driver's config), and that caching and shuffling (without the shuffle service) works. Clicked through the UI to make sure all pages (including executor thread dumps) worked. Also tested apps without docker, and ran unit tests. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15120 from vanzin/SPARK-4563.
Diffstat (limited to 'mesos')
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala3
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala5
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala3
3 files changed, 7 insertions, 4 deletions
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index e19d445137..2963d161d6 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -32,6 +32,7 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.TaskState
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
@@ -424,7 +425,7 @@ trait MesosSchedulerUtils extends Logging {
}
}
- val managedPortNames = List("spark.executor.port", "spark.blockManager.port")
+ val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
/**
* The values of the non-zero ports to be used by the executor process.
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index bbc79dd1ed..c3ab488e2a 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -221,7 +222,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
test("Port offer decline when there is no appropriate range") {
- setBackend(Map("spark.blockManager.port" -> "30100"))
+ setBackend(Map(BLOCK_MANAGER_PORT.key -> "30100"))
val offeredPorts = (31100L, 31200L)
val (mem, cpu) = (backend.executorMemory(sc), 4)
@@ -242,7 +243,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("Port offer accepted with user defined port numbers") {
val port = 30100
- setBackend(Map("spark.blockManager.port" -> s"$port"))
+ setBackend(Map(BLOCK_MANAGER_PORT.key -> s"$port"))
val offeredPorts = (30000L, 31000L)
val (mem, cpu) = (backend.executorMemory(sc), 4)
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index e3d794931a..ec47ab1531 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest._
import org.scalatest.mock.MockitoSugar
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
@@ -179,7 +180,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Port reservation is done correctly with user specified ports only") {
val conf = new SparkConf()
conf.set("spark.executor.port", "3000" )
- conf.set("spark.blockManager.port", "4000")
+ conf.set(BLOCK_MANAGER_PORT, 4000)
val portResource = createTestPortResource((3000, 5000), Some("my_role"))
val (resourcesLeft, resourcesToBeUsed) = utils