diff options
author | root <root@domU-12-31-39-15-32-8F.compute-1.internal> | 2012-06-20 06:48:26 +0000 |
---|---|---|
committer | root <root@domU-12-31-39-15-32-8F.compute-1.internal> | 2012-06-20 06:48:26 +0000 |
commit | 6ad3e1f1b4c35200426a44360dc2b3477c62459b (patch) | |
tree | 0907aff85a5411d45fbaab23ad4d73e501d1168d | |
parent | e896a505e273a5a275f4a4d58470beddea8146df (diff) | |
download | spark-6ad3e1f1b4c35200426a44360dc2b3477c62459b.tar.gz spark-6ad3e1f1b4c35200426a44360dc2b3477c62459b.tar.bz2 spark-6ad3e1f1b4c35200426a44360dc2b3477c62459b.zip |
Various fixes when running on Mesos
-rw-r--r-- | core/src/main/scala/spark/JavaSerializer.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala | 6 |
3 files changed, 9 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala index ec5c33d1df..c17ec995d4 100644 --- a/core/src/main/scala/spark/JavaSerializer.scala +++ b/core/src/main/scala/spark/JavaSerializer.scala @@ -3,6 +3,8 @@ package spark import java.io._ import java.nio.ByteBuffer +import spark.util.ByteBufferInputStream + class JavaSerializationStream(out: OutputStream) extends SerializationStream { val objOut = new ObjectOutputStream(out) def writeObject[T](t: T) { objOut.writeObject(t) } @@ -31,13 +33,13 @@ class JavaSerializerInstance extends SerializerInstance { } def deserialize[T](bytes: ByteBuffer): T = { - val bis = new ByteArrayInputStream(bytes.array()) + val bis = new ByteBufferInputStream(bytes) val in = deserializeStream(bis) in.readObject().asInstanceOf[T] } def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { - val bis = new ByteArrayInputStream(bytes.array()) + val bis = new ByteBufferInputStream(bytes) val in = deserializeStream(bis, loader) in.readObject().asInstanceOf[T] } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index eeaf1d7c11..8bb60b9845 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -93,7 +93,7 @@ class SparkContext( case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => new LocalScheduler(threads.toInt, maxFailures.toInt) case _ => - System.loadLibrary("mesos") + MesosNativeLibrary.load() if (System.getProperty("spark.mesos.coarse", "false") == "true") { new CoarseMesosScheduler(this, master, frameworkName) } else { diff --git a/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala index 535c17d9d4..a8bebf8e50 100644 --- a/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/mesos/TaskSetManager.scala @@ -267,7 +267,8 @@ class TaskSetManager( logInfo("Finished TID %s in %d ms (progress: %d/%d)".format( tid, info.duration, tasksFinished, numTasks)) // Deserialize task result and pass it to the scheduler - val result = ser.deserialize[TaskResult[_]](status.getData.asReadOnlyByteBuffer) + val result = ser.deserialize[TaskResult[_]]( + status.getData.asReadOnlyByteBuffer, getClass.getClassLoader) sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates) // Mark finished and stop if we've finished all the tasks finished(index) = true @@ -291,7 +292,8 @@ class TaskSetManager( // Check if the problem is a map output fetch failure. In that case, this // task will never succeed on any node, so tell the scheduler about it. if (status.getData != null && status.getData.size > 0) { - val reason = ser.deserialize[TaskEndReason](status.getData.asReadOnlyByteBuffer) + val reason = ser.deserialize[TaskEndReason]( + status.getData.asReadOnlyByteBuffer, getClass.getClassLoader) reason match { case fetchFailed: FetchFailed => logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress) |