diff options
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala | 2 |
2 files changed, 4 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index d934293b70..f64818876b 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -515,6 +515,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: } // Serialize and return the task val startTime = System.currentTimeMillis + // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here + // we assume the task can be serialized without exceptions. val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = System.currentTimeMillis - startTime diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala index bbce9eda64..a9b49cad0e 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala @@ -114,6 +114,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL) taskInfos(taskId) = info + // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here + // we assume the task can be serialized without exceptions. val bytes = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes") |