aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-04-12 17:53:02 +0000
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-04-12 17:53:02 +0000
commit3b745176e0ec5fda8c7afef04aec1040e1c649a9 (patch)
tree5bf4e304c845ed7d63da534c7cdae47673e59756
parent112655f03201c877b5ff3e43519cde8052909095 (diff)
downloadspark-3b745176e0ec5fda8c7afef04aec1040e1c649a9.tar.gz
spark-3b745176e0ec5fda8c7afef04aec1040e1c649a9.tar.bz2
spark-3b745176e0ec5fda8c7afef04aec1040e1c649a9.zip
Bug fix to pluggable closure serialization change
-rw-r--r--core/src/main/scala/spark/LocalScheduler.scala7
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala4
-rw-r--r--core/src/main/scala/spark/Utils.scala1
3 files changed, 5 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala
index 8972d6c290..c86b967aa5 100644
--- a/core/src/main/scala/spark/LocalScheduler.scala
+++ b/core/src/main/scala/spark/LocalScheduler.scala
@@ -42,10 +42,9 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
val startTime = System.currentTimeMillis
val bytes = ser.serialize(task)
val timeTaken = System.currentTimeMillis - startTime
- logInfo("Size of task %d is %d bytes and took %d ms to serialize by %s"
- .format(idInJob, bytes.size, timeTaken, ser.getClass.getName))
- val deserializedTask = ser.deserialize[Task[_]](
- bytes, Thread.currentThread.getContextClassLoader)
+ logInfo("Size of task %d is %d bytes and took %d ms to serialize".format(
+ idInJob, bytes.size, timeTaken))
+ val deserializedTask = ser.deserialize[Task[_]](bytes, currentThread.getContextClassLoader)
val result: Any = deserializedTask.run(attemptId)
val accumUpdates = Accumulators.values
logInfo("Finished task " + idInJob)
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index b221c2e309..0f36699d26 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -218,7 +218,7 @@ class SimpleJob(
logInfo("Finished TID %s (progress: %d/%d)".format(tid, tasksFinished, numTasks))
// Deserialize task result
val result = ser.deserialize[TaskResult[_]](
- status.getData.toByteArray)
+ status.getData.toByteArray, getClass.getClassLoader)
sched.taskEnded(tasks(index), Success, result.value, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks
finished(index) = true
@@ -241,7 +241,7 @@ class SimpleJob(
// task will never succeed on any node, so tell the scheduler about it.
if (status.getData != null && status.getData.size > 0) {
val reason = ser.deserialize[TaskEndReason](
- status.getData.toByteArray)
+ status.getData.toByteArray, getClass.getClassLoader)
reason match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.serverUri)
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 55f2e0691d..58b5fa6bbd 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -12,7 +12,6 @@ import scala.util.Random
* Various utility methods used by Spark.
*/
object Utils {
-
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)