aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala2
2 files changed, 4 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index d934293b70..f64818876b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -515,6 +515,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
}
// Serialize and return the task
val startTime = System.currentTimeMillis
+ // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+ // we assume the task can be serialized without exceptions.
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = System.currentTimeMillis - startTime
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index bbce9eda64..a9b49cad0e 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -114,6 +114,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1",
TaskLocality.NODE_LOCAL)
taskInfos(taskId) = info
+ // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+ // we assume the task can be serialized without exceptions.
val bytes = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")