aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorli-zhihui <zhihui.li@intel.com>2014-07-14 15:32:49 -0500
committerThomas Graves <tgraves@apache.org>2014-07-14 15:32:49 -0500
commit3dd8af7a6623201c28231f4b71f59ea4e9ae29bf (patch)
tree35a9569374a32a9a297d3ea4322732d528be7a6c /core/src/main/scala
parentd60b09bb60cff106fa0acddebf35714503b20f03 (diff)
downloadspark-3dd8af7a6623201c28231f4b71f59ea4e9ae29bf.tar.gz
spark-3dd8af7a6623201c28231f4b71f59ea4e9ae29bf.tar.bz2
spark-3dd8af7a6623201c28231f4b71f59ea4e9ae29bf.zip
[SPARK-1946] Submit tasks after (configured ratio) executors have been registered
Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality. A simple solution is sleeping few seconds in application, so that executors have enough time to register. The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered. \# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0 spark.scheduler.minRegisteredExecutorsRatio = 0.8 \# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000 spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000 Author: li-zhihui <zhihui.li@intel.com> Closes #900 from li-zhihui/master and squashes the following commits: b9f8326 [li-zhihui] Add logs & edit docs 1ac08b1 [li-zhihui] Add new configs to user docs 22ead12 [li-zhihui] Move waitBackendReady to postStartHook c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS 4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor 0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks 4261454 [li-zhihui] Add docs for new configs & code style ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime 6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha 812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode e7b6272 [li-zhihui] support yarn-cluster 37f7dc2 [li-zhihui] support yarn mode(percentage style) 3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala1
5 files changed, 56 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8819e73d17..8052499ab7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
throw new SparkException("YARN mode not available ?", e)
}
}
- val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+ val backend = try {
+ val clazz =
+ Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
+ val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
+ cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
+ } catch {
+ case e: Exception => {
+ throw new SparkException("YARN mode not available ?", e)
+ }
+ }
scheduler.initialize(backend)
scheduler
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 6a6d8e609b..e41e0a9841 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
+ def isReady(): Boolean = true
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5ed2803d76..4b6d6da5a6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl(
}
}
+ override def postStartHook() {
+ waitBackendReady()
+ }
+
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
@@ -437,6 +441,17 @@ private[spark] class TaskSchedulerImpl(
// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None
+
+ private def waitBackendReady(): Unit = {
+ if (backend.isReady) {
+ return
+ }
+ while (!backend.isReady) {
+ synchronized {
+ this.wait(100)
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 05d01b0c82..0f5545e2ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
+ var totalExpectedExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ // Submit tasks only after (registered executors / total expected executors)
+ // is equal to at least this value, that is double between 0 and 1.
+ var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
+ if (minRegisteredRatio > 1) minRegisteredRatio = 1
+ // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
+ val maxRegisteredWaitingTime =
+ conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
+ val createTime = System.currentTimeMillis()
+ var ready = if (minRegisteredRatio <= 0) true else false
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
@@ -83,6 +93,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
+ if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
+ ready = true
+ logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
+ executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
+ ", minRegisteredExecutorsRatio: " + minRegisteredRatio)
+ }
makeOffers()
}
@@ -247,6 +263,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
}
}
+
+ override def isReady(): Boolean = {
+ if (ready) {
+ return true
+ }
+ if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
+ ready = true
+ logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
+ "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
+ return true
+ }
+ false
+ }
}
private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c07b3f7b6..bf2dc88e29 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
memory: Int) {
+ totalExpectedExecutors.addAndGet(1)
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}