aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAkshat Aranya <aaranya@quantcast.com>2015-05-22 22:03:31 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-22 22:03:31 -0700
commita16357413d2823bcc1d1bf55b4da191dc9b1b69a (patch)
tree55fa8fa2bc4e33267a29d9962ec3af65d70567be
parent63a5ce75eac48a297751ac505d70ce4d47daf903 (diff)
downloadspark-a16357413d2823bcc1d1bf55b4da191dc9b1b69a.tar.gz
spark-a16357413d2823bcc1d1bf55b4da191dc9b1b69a.tar.bz2
spark-a16357413d2823bcc1d1bf55b4da191dc9b1b69a.zip
[SPARK-7795] [CORE] Speed up task scheduling in standalone mode by reusing serializer
My experiments with scheduling very short tasks in standalone cluster mode indicated that a significant amount of time was being spent in scheduling the tasks (>500ms for 256 tasks). I found that most of the time was being spent in creating a new instance of serializer for each task. Changing this to just one serializer brought down the scheduling time to 8ms. Author: Akshat Aranya <aaranya@quantcast.com> Closes #6323 from coolfrood/master and squashes the following commits: 12d8c9e [Akshat Aranya] Reduce visibility of serializer bd4a5dd [Akshat Aranya] Style fix 0b8ca93 [Akshat Aranya] Incorporate review comments fe530cd [Akshat Aranya] Speed up task scheduling in standalone mode by reusing serializer instead of creating a new one for each task.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f107148f3b..c5bc6294a5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -69,6 +69,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
+ // If this DriverEndpoint is changed to support multiple threads,
+ // then this may need to be changed so that we don't share the serializer
+ // instance across threads
+ private val ser = SparkEnv.get.closureSerializer.newInstance()
+
override protected def log = CoarseGrainedSchedulerBackend.this.log
private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -163,7 +168,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Make fake resource offers on all executors
- def makeOffers() {
+ private def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
@@ -175,16 +180,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Make fake resource offers on just one executor
- def makeOffers(executorId: String) {
+ private def makeOffers(executorId: String) {
val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
}
// Launch tasks returned by a set of resource offers
- def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
+ private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)