aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/lib/mesos-0.9.0.jarbin0 -> 186497 bytes
-rw-r--r--core/lib/mesos.jarbin147412 -> 0 bytes
-rw-r--r--core/src/main/scala/spark/Executor.scala12
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala22
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala4
-rw-r--r--core/src/main/scala/spark/SparkContext.scala4
-rw-r--r--project/SparkBuild.scala8
7 files changed, 33 insertions, 17 deletions
diff --git a/core/lib/mesos-0.9.0.jar b/core/lib/mesos-0.9.0.jar
new file mode 100644
index 0000000000..d075085191
--- /dev/null
+++ b/core/lib/mesos-0.9.0.jar
Binary files differ
diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar
deleted file mode 100644
index 941966c46a..0000000000
--- a/core/lib/mesos.jar
+++ /dev/null
Binary files differ
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index 71a2ded7e7..1583c083a7 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -24,9 +24,15 @@ class Executor extends org.apache.mesos.Executor with Logging {
initLogging()
- override def init(d: ExecutorDriver, args: ExecutorArgs) {
+ override def registered(
+ driver: ExecutorDriver,
+ executorInfo: ExecutorInfo,
+ frameworkId: FrameworkID,
+ frameworkInfo: FrameworkInfo,
+ slaveId: SlaveID,
+ slaveInfo: SlaveInfo) {
// Read spark.* system properties from executor arg
- val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray)
+ val props = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
for ((key, value) <- props) {
System.setProperty(key, value)
}
@@ -172,7 +178,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
*/
object Executor extends Logging {
def main(args: Array[String]) {
- System.loadLibrary("mesos")
+ MesosNativeLibrary.load()
// Create a new Executor and start it running
val exec = new Executor
new MesosExecutorDriver(exec).run()
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index ee14d091ce..bc9a65457d 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -124,25 +124,29 @@ private class MesosScheduler(
"property, the SPARK_HOME environment variable or the SparkContext constructor")
}
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
- val params = Params.newBuilder()
+ val environment = Environment.newBuilder()
for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
if (System.getenv(key) != null) {
- params.addParam(Param.newBuilder()
- .setKey("env." + key)
- .setValue(System.getenv(key))
- .build())
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(key)
+ .setValue(System.getenv(key))
+ .build())
}
}
+ environment.build()
val memory = Resource.newBuilder()
.setName("mem")
- .setType(Resource.Type.SCALAR)
- .setScalar(Resource.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build())
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build())
+ .build()
+ val command = CommandInfo.newBuilder()
+ .setValue(execScript)
+ .setEnvironment(environment)
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue("default").build())
- .setUri(execScript)
+ .setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
- .setParams(params.build())
.addResources(memory)
.build()
}
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index 5e42ae6ecd..2050558285 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -167,8 +167,8 @@ class SimpleJob(
// Create and return the Mesos task object
val cpuRes = Resource.newBuilder()
.setName("cpus")
- .setType(Resource.Type.SCALAR)
- .setScalar(Resource.Scalar.newBuilder().setValue(CPUS_PER_TASK).build())
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(CPUS_PER_TASK).build())
.build()
val serializedTask = Utils.serialize(task)
logDebug("Serialized size: " + serializedTask.size)
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d543fb9c95..6f7b0a7fec 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
+import org.apache.mesos.MesosNativeLibrary
+
import spark.broadcast._
class SparkContext(
@@ -72,7 +74,7 @@ class SparkContext(
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
new LocalScheduler(threads.toInt, maxFailures.toInt)
case _ =>
- System.loadLibrary("mesos")
+ MesosNativeLibrary.load()
new MesosScheduler(this, master, frameworkName)
}
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 120439cfc9..f2c1e95c92 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -4,6 +4,9 @@ import sbtassembly.Plugin._
import AssemblyKeys._
object SparkBuild extends Build {
+ // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
+ // "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop.
+ val HADOOP_VERSION = "0.20.205.0"
lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel)
@@ -41,7 +44,8 @@ object SparkBuild extends Build {
name := "spark-core",
resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
- "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/"
+ "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
+ "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/"
),
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "11.0.1",
@@ -49,7 +53,7 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"com.ning" % "compress-lzf" % "0.8.4",
- "org.apache.hadoop" % "hadoop-core" % "0.20.2",
+ "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
"asm" % "asm-all" % "3.3.1",
"com.google.protobuf" % "protobuf-java" % "2.3.0",
"de.javakaffee" % "kryo-serializers" % "0.9",