aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-02-23 12:30:57 +0000
committerSean Owen <sowen@cloudera.com>2016-02-23 12:30:57 +0000
commite99d0170982b06676110906db4de6196586829f6 (patch)
treebcd00ef6354908f8b02aa99b1eaeae5252e7a026 /core
parent87250580f214cb7c4dff01c5a3498ea6cb79a27e (diff)
downloadspark-e99d0170982b06676110906db4de6196586829f6.tar.gz
spark-e99d0170982b06676110906db4de6196586829f6.tar.bz2
spark-e99d0170982b06676110906db4de6196586829f6.zip
[SPARK-13220][CORE] deprecate yarn-client and yarn-cluster mode
Author: jerryshao <sshao@hortonworks.com> Closes #11229 from jerryshao/SPARK-13220.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala7
5 files changed, 68 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 36e240e618..b81bfb3182 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -503,6 +503,31 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
set("spark.executor.instances", value)
}
}
+
+ if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
+ val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
+ "instead use \"yarn\" with specified deploy mode."
+
+ get("spark.master") match {
+ case "yarn-cluster" =>
+ logWarning(warning)
+ set("spark.master", "yarn")
+ set("spark.submit.deployMode", "cluster")
+ case "yarn-client" =>
+ logWarning(warning)
+ set("spark.master", "yarn")
+ set("spark.submit.deployMode", "client")
+ case _ => // Any other unexpected master will be checked when creating scheduler backend.
+ }
+ }
+
+ if (contains("spark.submit.deployMode")) {
+ get("spark.submit.deployMode") match {
+ case "cluster" | "client" =>
+ case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
+ "\"client\".")
+ }
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cd7eed382e..a1fa266e18 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -237,6 +237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def jars: Seq[String] = _jars
def files: Seq[String] = _files
def master: String = _conf.get("spark.master")
+ def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
def appName: String = _conf.get("spark.app.name")
private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
@@ -375,10 +376,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
- // yarn-standalone is deprecated, but still supported
- if ((master == "yarn-cluster" || master == "yarn-standalone") &&
- !_conf.contains("spark.yarn.app.id")) {
- throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
+ if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
+ throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}
@@ -414,7 +413,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
- if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
+ if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
@@ -491,7 +490,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
// Create and start the scheduler
- val (sched, ts) = SparkContext.createTaskScheduler(this, master)
+ val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
@@ -1590,10 +1589,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
key = uri.getScheme match {
// A JAR file which exists only on the driver node
case null | "file" =>
- // yarn-standalone is deprecated, but still supported
- if (SparkHadoopUtil.get.isYarnMode() &&
- (master == "yarn-standalone" || master == "yarn-cluster")) {
- // In order for this to work in yarn-cluster mode the user must specify the
+ if (master == "yarn" && deployMode == "cluster") {
+ // In order for this to work in yarn cluster mode the user must specify the
// --addJars option to the client to upload the file into the distributed cache
// of the AM to make it show up in the current working directory.
val fileName = new Path(uri.getPath).getName()
@@ -2319,7 +2316,8 @@ object SparkContext extends Logging {
*/
private def createTaskScheduler(
sc: SparkContext,
- master: String): (SchedulerBackend, TaskScheduler) = {
+ master: String,
+ deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
@@ -2381,11 +2379,7 @@ object SparkContext extends Logging {
}
(backend, scheduler)
- case "yarn-standalone" | "yarn-cluster" =>
- if (master == "yarn-standalone") {
- logWarning(
- "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
- }
+ case "yarn" if deployMode == "cluster" =>
val scheduler = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
@@ -2410,7 +2404,7 @@ object SparkContext extends Logging {
scheduler.initialize(backend)
(backend, scheduler)
- case "yarn-client" =>
+ case "yarn" if deployMode == "client" =>
val scheduler = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
@@ -2451,7 +2445,7 @@ object SparkContext extends Logging {
case zkUrl if zkUrl.startsWith("zk://") =>
logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
- createTaskScheduler(sc, "mesos://" + zkUrl)
+ createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
case _ =>
throw new SparkException("Could not parse Master URL: '" + master + "'")
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a6749f7e38..d5a3383932 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -226,11 +226,17 @@ object SparkSubmit {
// Set the cluster manager
val clusterManager: Int = args.master match {
- case m if m.startsWith("yarn") => YARN
+ case "yarn" => YARN
+ case "yarn-client" | "yarn-cluster" =>
+ printWarning(s"Master ${args.master} is deprecated since 2.0." +
+ " Please use master \"yarn\" with specified deploy mode instead.")
+ YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
- case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
+ case _ =>
+ printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
+ -1
}
// Set the deploy mode; default is client mode
@@ -240,23 +246,20 @@ object SparkSubmit {
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
}
- // Because "yarn-cluster" and "yarn-client" encapsulate both the master
- // and deploy mode, we have some logic to infer the master and deploy mode
+ // Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
+ // the master and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
if (clusterManager == YARN) {
- if (args.master == "yarn-standalone") {
- printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.")
- args.master = "yarn-cluster"
- }
(args.master, args.deployMode) match {
case ("yarn-cluster", null) =>
deployMode = CLUSTER
+ args.master = "yarn"
case ("yarn-cluster", "client") =>
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
case ("yarn-client", "cluster") =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
case (_, mode) =>
- args.master = "yarn-" + Option(mode).getOrElse("client")
+ args.master = "yarn"
}
// Make sure YARN is included in our build if we're trying to use it
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index b96c937f02..9b6ab7b6bc 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -29,15 +29,21 @@ class SparkContextSchedulerCreationSuite
extends SparkFunSuite with LocalSparkContext with PrivateMethodTester with Logging {
def createTaskScheduler(master: String): TaskSchedulerImpl =
- createTaskScheduler(master, new SparkConf())
+ createTaskScheduler(master, "client")
- def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl = {
+ def createTaskScheduler(master: String, deployMode: String): TaskSchedulerImpl =
+ createTaskScheduler(master, deployMode, new SparkConf())
+
+ def createTaskScheduler(
+ master: String,
+ deployMode: String,
+ conf: SparkConf): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
sc = new SparkContext("local", "test", conf)
val createTaskSchedulerMethod =
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
- val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
+ val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, deployMode)
sched.asInstanceOf[TaskSchedulerImpl]
}
@@ -107,7 +113,7 @@ class SparkContextSchedulerCreationSuite
test("local-default-parallelism") {
val conf = new SparkConf().set("spark.default.parallelism", "16")
- val sched = createTaskScheduler("local", conf)
+ val sched = createTaskScheduler("local", "client", conf)
sched.backend match {
case s: LocalBackend => assert(s.defaultParallelism() === 16)
@@ -122,9 +128,9 @@ class SparkContextSchedulerCreationSuite
}
}
- def testYarn(master: String, expectedClassName: String) {
+ def testYarn(master: String, deployMode: String, expectedClassName: String) {
try {
- val sched = createTaskScheduler(master)
+ val sched = createTaskScheduler(master, deployMode)
assert(sched.getClass === Utils.classForName(expectedClassName))
} catch {
case e: SparkException =>
@@ -135,21 +141,17 @@ class SparkContextSchedulerCreationSuite
}
test("yarn-cluster") {
- testYarn("yarn-cluster", "org.apache.spark.scheduler.cluster.YarnClusterScheduler")
- }
-
- test("yarn-standalone") {
- testYarn("yarn-standalone", "org.apache.spark.scheduler.cluster.YarnClusterScheduler")
+ testYarn("yarn", "cluster", "org.apache.spark.scheduler.cluster.YarnClusterScheduler")
}
test("yarn-client") {
- testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnScheduler")
+ testYarn("yarn", "client", "org.apache.spark.scheduler.cluster.YarnScheduler")
}
def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
try {
- val sched = createTaskScheduler(master, conf)
+ val sched = createTaskScheduler(master, "client", conf)
assert(sched.backend.getClass === expectedClass)
} catch {
case e: UnsatisfiedLinkError =>
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index fe2c8299a0..41ac60ece0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -358,7 +358,8 @@ class SparkSubmitSuite
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
sysProps("spark.executor.memory") should be ("5g")
- sysProps("spark.master") should be ("yarn-cluster")
+ sysProps("spark.master") should be ("yarn")
+ sysProps("spark.submit.deployMode") should be ("cluster")
mainClass should be ("org.apache.spark.deploy.yarn.Client")
}
@@ -454,7 +455,7 @@ class SparkSubmitSuite
// Test files and archives (Yarn)
val clArgs2 = Seq(
- "--master", "yarn-client",
+ "--master", "yarn",
"--class", "org.SomeClass",
"--files", files,
"--archives", archives,
@@ -512,7 +513,7 @@ class SparkSubmitSuite
writer2.println("spark.yarn.dist.archives " + archives)
writer2.close()
val clArgs2 = Seq(
- "--master", "yarn-client",
+ "--master", "yarn",
"--class", "org.SomeClass",
"--properties-file", f2.getPath,
"thejar.jar"