diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-03-17 12:31:34 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-03-17 12:31:34 -0700 |
commit | a099a63a8ae23a047f3e8ef39c9d8469c84d125d (patch) | |
tree | ebda6abffc0f3df49a3e70b3d467c865760356ed | |
parent | a5e2b6a6bd682eac2859b9105feabaf95ec1d059 (diff) | |
download | spark-a099a63a8ae23a047f3e8ef39c9d8469c84d125d.tar.gz spark-a099a63a8ae23a047f3e8ef39c9d8469c84d125d.tar.bz2 spark-a099a63a8ae23a047f3e8ef39c9d8469c84d125d.zip |
Initial work to make Spark compile with Mesos 0.9 and Hadoop 1.0
-rw-r--r-- | core/lib/mesos-0.9.0.jar | bin | 0 -> 186497 bytes | |||
-rw-r--r-- | core/lib/mesos.jar | bin | 147412 -> 0 bytes | |||
-rw-r--r-- | core/src/main/scala/spark/Executor.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/spark/MesosScheduler.scala | 22 | ||||
-rw-r--r-- | core/src/main/scala/spark/SimpleJob.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 4 | ||||
-rw-r--r-- | project/SparkBuild.scala | 8 |
7 files changed, 33 insertions, 17 deletions
diff --git a/core/lib/mesos-0.9.0.jar b/core/lib/mesos-0.9.0.jar Binary files differnew file mode 100644 index 0000000000..d075085191 --- /dev/null +++ b/core/lib/mesos-0.9.0.jar diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar Binary files differdeleted file mode 100644 index 941966c46a..0000000000 --- a/core/lib/mesos.jar +++ /dev/null diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 71a2ded7e7..1583c083a7 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -24,9 +24,15 @@ class Executor extends org.apache.mesos.Executor with Logging { initLogging() - override def init(d: ExecutorDriver, args: ExecutorArgs) { + override def registered( + driver: ExecutorDriver, + executorInfo: ExecutorInfo, + frameworkId: FrameworkID, + frameworkInfo: FrameworkInfo, + slaveId: SlaveID, + slaveInfo: SlaveInfo) { // Read spark.* system properties from executor arg - val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray) + val props = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) for ((key, value) <- props) { System.setProperty(key, value) } @@ -172,7 +178,7 @@ class Executor extends org.apache.mesos.Executor with Logging { */ object Executor extends Logging { def main(args: Array[String]) { - System.loadLibrary("mesos") + MesosNativeLibrary.load() // Create a new Executor and start it running val exec = new Executor new MesosExecutorDriver(exec).run() diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index ee14d091ce..bc9a65457d 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -124,25 +124,29 @@ private class MesosScheduler( "property, the SPARK_HOME environment variable or the SparkContext constructor") } val execScript = new File(sparkHome, "spark-executor").getCanonicalPath - val params = Params.newBuilder() + val environment = Environment.newBuilder() for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { if (System.getenv(key) != null) { - params.addParam(Param.newBuilder() - .setKey("env." + key) - .setValue(System.getenv(key)) - .build()) + environment.addVariables(Environment.Variable.newBuilder() + .setName(key) + .setValue(System.getenv(key)) + .build()) } } + environment.build() val memory = Resource.newBuilder() .setName("mem") - .setType(Resource.Type.SCALAR) - .setScalar(Resource.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) + .build() + val command = CommandInfo.newBuilder() + .setValue(execScript) + .setEnvironment(environment) .build() ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue("default").build()) - .setUri(execScript) + .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) - .setParams(params.build()) .addResources(memory) .build() } diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index 5e42ae6ecd..2050558285 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -167,8 +167,8 @@ class SimpleJob( // Create and return the Mesos task object val cpuRes = Resource.newBuilder() .setName("cpus") - .setType(Resource.Type.SCALAR) - .setScalar(Resource.Scalar.newBuilder().setValue(CPUS_PER_TASK).build()) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(CPUS_PER_TASK).build()) .build() val serializedTask = Utils.serialize(task) logDebug("Serialized size: " + serializedTask.size) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index d543fb9c95..6f7b0a7fec 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} +import org.apache.mesos.MesosNativeLibrary + import spark.broadcast._ class SparkContext( @@ -72,7 +74,7 @@ class SparkContext( case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => new LocalScheduler(threads.toInt, maxFailures.toInt) case _ => - System.loadLibrary("mesos") + MesosNativeLibrary.load() new MesosScheduler(this, master, frameworkName) } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 120439cfc9..f2c1e95c92 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -4,6 +4,9 @@ import sbtassembly.Plugin._ import AssemblyKeys._ object SparkBuild extends Build { + // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or + // "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop. + val HADOOP_VERSION = "0.20.205.0" lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel) @@ -41,7 +44,8 @@ object SparkBuild extends Build { name := "spark-core", resolvers ++= Seq( "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", - "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/" + "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", + "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/" ), libraryDependencies ++= Seq( "com.google.guava" % "guava" % "11.0.1", @@ -49,7 +53,7 @@ object SparkBuild extends Build { "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "com.ning" % "compress-lzf" % "0.8.4", - "org.apache.hadoop" % "hadoop-core" % "0.20.2", + "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.3.0", "de.javakaffee" % "kryo-serializers" % "0.9", |