diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-04-12 17:53:02 +0000 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-04-12 17:53:02 +0000 |
commit | 3b745176e0ec5fda8c7afef04aec1040e1c649a9 (patch) | |
tree | 5bf4e304c845ed7d63da534c7cdae47673e59756 | |
parent | 112655f03201c877b5ff3e43519cde8052909095 (diff) | |
download | spark-3b745176e0ec5fda8c7afef04aec1040e1c649a9.tar.gz spark-3b745176e0ec5fda8c7afef04aec1040e1c649a9.tar.bz2 spark-3b745176e0ec5fda8c7afef04aec1040e1c649a9.zip |
Bug fix to pluggable closure serialization change
-rw-r--r-- | core/src/main/scala/spark/LocalScheduler.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/SimpleJob.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 1 |
3 files changed, 5 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index 8972d6c290..c86b967aa5 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -42,10 +42,9 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule val startTime = System.currentTimeMillis val bytes = ser.serialize(task) val timeTaken = System.currentTimeMillis - startTime - logInfo("Size of task %d is %d bytes and took %d ms to serialize by %s" - .format(idInJob, bytes.size, timeTaken, ser.getClass.getName)) - val deserializedTask = ser.deserialize[Task[_]]( - bytes, Thread.currentThread.getContextClassLoader) + logInfo("Size of task %d is %d bytes and took %d ms to serialize".format( + idInJob, bytes.size, timeTaken)) + val deserializedTask = ser.deserialize[Task[_]](bytes, currentThread.getContextClassLoader) val result: Any = deserializedTask.run(attemptId) val accumUpdates = Accumulators.values logInfo("Finished task " + idInJob) diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index b221c2e309..0f36699d26 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -218,7 +218,7 @@ class SimpleJob( logInfo("Finished TID %s (progress: %d/%d)".format(tid, tasksFinished, numTasks)) // Deserialize task result val result = ser.deserialize[TaskResult[_]]( - status.getData.toByteArray) + status.getData.toByteArray, getClass.getClassLoader) sched.taskEnded(tasks(index), Success, result.value, result.accumUpdates) // Mark finished and stop if we've finished all the tasks finished(index) = true @@ -241,7 +241,7 @@ class SimpleJob( // task will never succeed on any node, so tell the scheduler about it. if (status.getData != null && status.getData.size > 0) { val reason = ser.deserialize[TaskEndReason]( - status.getData.toByteArray) + status.getData.toByteArray, getClass.getClassLoader) reason match { case fetchFailed: FetchFailed => logInfo("Loss was due to fetch failure from " + fetchFailed.serverUri) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 55f2e0691d..58b5fa6bbd 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -12,7 +12,6 @@ import scala.util.Random * Various utility methods used by Spark. */ object Utils { - def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) |