diff options
author | Reynold Xin <reynoldx@gmail.com> | 2013-07-23 20:28:39 -0700 |
---|---|---|
committer | Reynold Xin <reynoldx@gmail.com> | 2013-07-23 20:28:39 -0700 |
commit | d33b8a2a0ffd6d085cbd8de22863a1f35c106270 (patch) | |
tree | 1d78210d7acb2035dc2d2f69b699394447ae2e0c | |
parent | 5ed38b4d1d7154235d5b72c35d3a8e63bac3a2de (diff) | |
download | spark-d33b8a2a0ffd6d085cbd8de22863a1f35c106270.tar.gz spark-d33b8a2a0ffd6d085cbd8de22863a1f35c106270.tar.bz2 spark-d33b8a2a0ffd6d085cbd8de22863a1f35c106270.zip |
Added comments on task closure serialization.
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala | 2 |
2 files changed, 4 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index d934293b70..f64818876b 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -515,6 +515,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: } // Serialize and return the task val startTime = System.currentTimeMillis + // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here + // we assume the task can be serialized without exceptions. val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = System.currentTimeMillis - startTime diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala index bbce9eda64..a9b49cad0e 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala @@ -114,6 +114,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL) taskInfos(taskId) = info + // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here + // we assume the task can be serialized without exceptions. val bytes = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes") |