aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/JavaSerializer.scala6
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala6
3 files changed, 9 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
index ec5c33d1df..c17ec995d4 100644
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ b/core/src/main/scala/spark/JavaSerializer.scala
@@ -3,6 +3,8 @@ package spark
import java.io._
import java.nio.ByteBuffer
+import spark.util.ByteBufferInputStream
+
class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
def writeObject[T](t: T) { objOut.writeObject(t) }
@@ -31,13 +33,13 @@ class JavaSerializerInstance extends SerializerInstance {
}
def deserialize[T](bytes: ByteBuffer): T = {
- val bis = new ByteArrayInputStream(bytes.array())
+ val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis)
in.readObject().asInstanceOf[T]
}
def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
- val bis = new ByteArrayInputStream(bytes.array())
+ val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis, loader)
in.readObject().asInstanceOf[T]
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index eeaf1d7c11..8bb60b9845 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -93,7 +93,7 @@ class SparkContext(
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
new LocalScheduler(threads.toInt, maxFailures.toInt)
case _ =>
- System.loadLibrary("mesos")
+ MesosNativeLibrary.load()
if (System.getProperty("spark.mesos.coarse", "false") == "true") {
new CoarseMesosScheduler(this, master, frameworkName)
} else {
diff --git a/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala
index 535c17d9d4..a8bebf8e50 100644
--- a/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala
@@ -267,7 +267,8 @@ class TaskSetManager(
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
tid, info.duration, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
- val result = ser.deserialize[TaskResult[_]](status.getData.asReadOnlyByteBuffer)
+ val result = ser.deserialize[TaskResult[_]](
+ status.getData.asReadOnlyByteBuffer, getClass.getClassLoader)
sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks
finished(index) = true
@@ -291,7 +292,8 @@ class TaskSetManager(
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
if (status.getData != null && status.getData.size > 0) {
- val reason = ser.deserialize[TaskEndReason](status.getData.asReadOnlyByteBuffer)
+ val reason = ser.deserialize[TaskEndReason](
+ status.getData.asReadOnlyByteBuffer, getClass.getClassLoader)
reason match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)