aboutsummaryrefslogtreecommitdiff
path: root/yarn/common
diff options
context:
space:
mode:
authorli-zhihui <zhihui.li@intel.com>2014-07-14 15:32:49 -0500
committerThomas Graves <tgraves@apache.org>2014-07-14 15:32:49 -0500
commit3dd8af7a6623201c28231f4b71f59ea4e9ae29bf (patch)
tree35a9569374a32a9a297d3ea4322732d528be7a6c /yarn/common
parentd60b09bb60cff106fa0acddebf35714503b20f03 (diff)
downloadspark-3dd8af7a6623201c28231f4b71f59ea4e9ae29bf.tar.gz
spark-3dd8af7a6623201c28231f4b71f59ea4e9ae29bf.tar.bz2
spark-3dd8af7a6623201c28231f4b71f59ea4e9ae29bf.zip
[SPARK-1946] Submit tasks after (configured ratio) executors have been registered
Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality. A simple solution is sleeping few seconds in application, so that executors have enough time to register. The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered. \# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0 spark.scheduler.minRegisteredExecutorsRatio = 0.8 \# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000 spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000 Author: li-zhihui <zhihui.li@intel.com> Closes #900 from li-zhihui/master and squashes the following commits: b9f8326 [li-zhihui] Add logs & edit docs 1ac08b1 [li-zhihui] Add new configs to user docs 22ead12 [li-zhihui] Move waitBackendReady to postStartHook c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS 4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor 0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks 4261454 [li-zhihui] Add docs for new configs & code style ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime 6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha 812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode e7b6272 [li-zhihui] support yarn-cluster 37f7dc2 [li-zhihui] support yarn mode(percentage style) 3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered
Diffstat (limited to 'yarn/common')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala6
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala1
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala40
5 files changed, 50 insertions, 1 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 25cc9016b1..4c383ab574 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorCores = 1
- var numExecutors = 2
+ var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
parseArgs(args.toList)
@@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
System.exit(exitCode)
}
}
+
+object ApplicationMasterArguments {
+ val DEFAULT_NUMBER_EXECUTORS = 2
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 6b91e6b9eb..15e8c21aa5 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -40,8 +40,10 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
override def postStartHook() {
+ super.postStartHook()
// The yarn application is running, but the executor might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+ // TODO It needn't after waitBackendReady
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index fd2694fe72..0f9fdcfcb6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -75,6 +75,7 @@ private[spark] class YarnClientSchedulerBackend(
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
+ totalExpectedExecutors.set(args.numExecutors)
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 39cdd2e8a5..9ee53d797c 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -48,9 +48,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
+ super.postStartHook()
if (sparkContextInitialized){
ApplicationMaster.waitForInitialAllocations()
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+ // TODO It needn't after waitBackendReady
Thread.sleep(3000L)
}
logInfo("YarnClusterScheduler.postStartHook done")
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
new file mode 100644
index 0000000000..a04b08f43c
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ sc: SparkContext)
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+ override def start() {
+ super.start()
+ var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+ numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
+ }
+ // System property can override environment variable.
+ numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
+ totalExpectedExecutors.set(numExecutors)
+ }
+}