aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorwoshilaiceshide <woshilaiceshide@qq.com>2014-07-23 11:05:41 -0700
committerMatei Zaharia <matei@databricks.com>2014-07-23 11:05:41 -0700
commitf776bc98878428940b5130c0d7d9b7ee452c0bd3 (patch)
tree747bc863ecd13f367aac22c812228d90f90dbc18 /core/src/main/scala
parent25921110fcd5afe568bf0d25fccd232787af7911 (diff)
downloadspark-f776bc98878428940b5130c0d7d9b7ee452c0bd3.tar.gz
spark-f776bc98878428940b5130c0d7d9b7ee452c0bd3.tar.bz2
spark-f776bc98878428940b5130c0d7d9b7ee452c0bd3.zip
[CORE] SPARK-2640: In "local[N]", free cores of the only executor should be touched by "spark.task.cpus" for every finish/start-up of tasks.
Make spark's "local[N]" better. In our company, we use "local[N]" in production. It works exellentlly. It's our best choice. Author: woshilaiceshide <woshilaiceshide@qq.com> Closes #1544 from woshilaiceshide/localX and squashes the following commits: 6c85154 [woshilaiceshide] [CORE] SPARK-2640: In "local[N]", free cores of the only executor should be touched by "spark.task.cpus" for every finish/start-up of tasks.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index e9f6273bfd..5b897597fa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -57,7 +57,7 @@ private[spark] class LocalActor(
case StatusUpdate(taskId, state, serializedData) =>
scheduler.statusUpdate(taskId, state, serializedData)
if (TaskState.isFinished(state)) {
- freeCores += 1
+ freeCores += scheduler.CPUS_PER_TASK
reviveOffers()
}
@@ -68,7 +68,7 @@ private[spark] class LocalActor(
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
- freeCores -= 1
+ freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, task.taskId, task.name, task.serializedTask)
}
}