aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-07-23 20:28:39 -0700
committerReynold Xin <reynoldx@gmail.com>2013-07-23 20:28:39 -0700
commitd33b8a2a0ffd6d085cbd8de22863a1f35c106270 (patch)
tree1d78210d7acb2035dc2d2f69b699394447ae2e0c /core
parent5ed38b4d1d7154235d5b72c35d3a8e63bac3a2de (diff)
downloadspark-d33b8a2a0ffd6d085cbd8de22863a1f35c106270.tar.gz
spark-d33b8a2a0ffd6d085cbd8de22863a1f35c106270.tar.bz2
spark-d33b8a2a0ffd6d085cbd8de22863a1f35c106270.zip
Added comments on task closure serialization.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala2
2 files changed, 4 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index d934293b70..f64818876b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -515,6 +515,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
}
// Serialize and return the task
val startTime = System.currentTimeMillis
+ // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+ // we assume the task can be serialized without exceptions.
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = System.currentTimeMillis - startTime
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index bbce9eda64..a9b49cad0e 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -114,6 +114,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1",
TaskLocality.NODE_LOCAL)
taskInfos(taskId) = info
+ // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+ // we assume the task can be serialized without exceptions.
val bytes = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")