aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-16 21:03:22 -0700
committerReynold Xin <rxin@databricks.com>2015-05-16 21:03:22 -0700
commit3b6ef2c5391b528ef989e24400fbb0c496c3b245 (patch)
tree969e64bd6feebda9c409b87f98e3247b2e393cd9 /core
parent161d0b4a41f453b21adde46a86e16c2743752799 (diff)
downloadspark-3b6ef2c5391b528ef989e24400fbb0c496c3b245.tar.gz
spark-3b6ef2c5391b528ef989e24400fbb0c496c3b245.tar.bz2
spark-3b6ef2c5391b528ef989e24400fbb0c496c3b245.zip
[SPARK-7655][Core] Deserializing value should not hold the TaskSchedulerImpl lock
We should not call `DirectTaskResult.value` when holding the `TaskSchedulerImpl` lock. It may cost dozens of seconds to deserialize a large object. Author: zsxwing <zsxwing@gmail.com> Closes #6195 from zsxwing/SPARK-7655 and squashes the following commits: 21f502e [zsxwing] Add more comments e25fa88 [zsxwing] Add comments 15010b5 [zsxwing] Deserialize value should not hold the TaskSchedulerImpl lock
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala6
3 files changed, 31 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 1f114a0207..8b2a742b96 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -40,6 +40,9 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {
+ private var valueObjectDeserialized = false
+ private var valueObject: T = _
+
def this() = this(null.asInstanceOf[ByteBuffer], null, null)
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
@@ -72,10 +75,26 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
}
}
metrics = in.readObject().asInstanceOf[TaskMetrics]
+ valueObjectDeserialized = false
}
+ /**
+ * When `value()` is called at the first time, it needs to deserialize `valueObject` from
+ * `valueBytes`. It may cost dozens of seconds for a large instance. So when calling `value` at
+ * the first time, the caller should avoid to block other threads.
+ *
+ * After the first time, `value()` is trivial and just returns the deserialized `valueObject`.
+ */
def value(): T = {
- val resultSer = SparkEnv.get.serializer.newInstance()
- resultSer.deserialize(valueBytes)
+ if (valueObjectDeserialized) {
+ valueObject
+ } else {
+ // This should not run when holding a lock because it may cost dozens of seconds for a large
+ // value.
+ val resultSer = SparkEnv.get.serializer.newInstance()
+ valueObject = resultSer.deserialize(valueBytes)
+ valueObjectDeserialized = true
+ valueObject
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 391827c1d2..46a6f6537e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -54,6 +54,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
+ // deserialize "value" without holding any lock so that it won't block other threads.
+ // We should call it here, so that when it's called again in
+ // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
+ directResult.value()
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 7dc325283d..c4487d5b37 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -620,6 +620,12 @@ private[spark] class TaskSetManager(
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
+ // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
+ // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
+ // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
+ // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
+ // Note: "result.value()" only deserializes the value when it's called at the first time, so
+ // here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {