aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorroot <root@domU-12-31-39-15-32-8F.compute-1.internal>2012-06-20 06:48:26 +0000
committerroot <root@domU-12-31-39-15-32-8F.compute-1.internal>2012-06-20 06:48:26 +0000
commit6ad3e1f1b4c35200426a44360dc2b3477c62459b (patch)
tree0907aff85a5411d45fbaab23ad4d73e501d1168d
parente896a505e273a5a275f4a4d58470beddea8146df (diff)
downloadspark-6ad3e1f1b4c35200426a44360dc2b3477c62459b.tar.gz
spark-6ad3e1f1b4c35200426a44360dc2b3477c62459b.tar.bz2
spark-6ad3e1f1b4c35200426a44360dc2b3477c62459b.zip
Various fixes when running on Mesos
-rw-r--r--core/src/main/scala/spark/JavaSerializer.scala6
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala6
3 files changed, 9 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
index ec5c33d1df..c17ec995d4 100644
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ b/core/src/main/scala/spark/JavaSerializer.scala
@@ -3,6 +3,8 @@ package spark
import java.io._
import java.nio.ByteBuffer
+import spark.util.ByteBufferInputStream
+
class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
def writeObject[T](t: T) { objOut.writeObject(t) }
@@ -31,13 +33,13 @@ class JavaSerializerInstance extends SerializerInstance {
}
def deserialize[T](bytes: ByteBuffer): T = {
- val bis = new ByteArrayInputStream(bytes.array())
+ val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis)
in.readObject().asInstanceOf[T]
}
def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
- val bis = new ByteArrayInputStream(bytes.array())
+ val bis = new ByteBufferInputStream(bytes)
val in = deserializeStream(bis, loader)
in.readObject().asInstanceOf[T]
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index eeaf1d7c11..8bb60b9845 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -93,7 +93,7 @@ class SparkContext(
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
new LocalScheduler(threads.toInt, maxFailures.toInt)
case _ =>
- System.loadLibrary("mesos")
+ MesosNativeLibrary.load()
if (System.getProperty("spark.mesos.coarse", "false") == "true") {
new CoarseMesosScheduler(this, master, frameworkName)
} else {
diff --git a/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala
index 535c17d9d4..a8bebf8e50 100644
--- a/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala
@@ -267,7 +267,8 @@ class TaskSetManager(
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
tid, info.duration, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
- val result = ser.deserialize[TaskResult[_]](status.getData.asReadOnlyByteBuffer)
+ val result = ser.deserialize[TaskResult[_]](
+ status.getData.asReadOnlyByteBuffer, getClass.getClassLoader)
sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks
finished(index) = true
@@ -291,7 +292,8 @@ class TaskSetManager(
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
if (status.getData != null && status.getData.size > 0) {
- val reason = ser.deserialize[TaskEndReason](status.getData.asReadOnlyByteBuffer)
+ val reason = ser.deserialize[TaskEndReason](
+ status.getData.asReadOnlyByteBuffer, getClass.getClassLoader)
reason match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)