diff options
-rw-r--r-- | README.md | 12 | ||||
-rwxr-xr-x | conf/spark-env.sh.template | 2 | ||||
-rw-r--r-- | core/lib/mesos-0.9.0.jar | bin | 0 -> 264708 bytes | |||
-rw-r--r-- | core/lib/mesos.jar | bin | 147412 -> 0 bytes | |||
-rw-r--r-- | core/src/main/scala/spark/Executor.scala | 34 | ||||
-rw-r--r-- | core/src/main/scala/spark/Job.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/MesosScheduler.scala | 49 | ||||
-rw-r--r-- | core/src/main/scala/spark/SimpleJob.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 4 | ||||
-rw-r--r-- | project/SparkBuild.scala | 10 | ||||
-rw-r--r-- | repl/src/test/scala/spark/repl/ReplSuite.scala | 2 | ||||
-rwxr-xr-x | run | 44 |
12 files changed, 109 insertions, 65 deletions
@@ -37,6 +37,15 @@ to connect to. This can be a Mesos URL, or "local" to run locally with one thread, or "local[N]" to run locally with N threads. +## A Note About Hadoop + +Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported +storage systems. Because the HDFS API has changed in different versions of +Hadoop, you must build Spark against the same version that your cluster runs. +You can change the version by setting the `HADOOP_VERSION` variable at the top +of `project/SparkBuild.scala`, then rebuilding Spark. + + ## Configuration Spark can be configured through two files: `conf/java-opts` and @@ -58,5 +67,8 @@ several Spark-specific variables you can set: - `SPARK_JAVA_OPTS`: Extra options to pass to JVM. +- `MESOS_NATIVE_LIBRARY`: Your Mesos library, if you want to run on a Mesos + cluster. For example, this might be `/usr/local/lib/libmesos.so` on Linux. + Note that `spark-env.sh` must be a shell script (it must be executable and start with a `#!` header to specify the shell to use). diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index 6852b23a34..532a635a1b 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -2,7 +2,7 @@ # Set Spark environment variables for your site in this file. Some useful # variables to set are: -# - MESOS_HOME, to point to your Mesos installation +# - MESOS_NATIVE_LIBRARY, to point to your Mesos native library (libmesos.so) # - SCALA_HOME, to point to your Scala installation # - SPARK_CLASSPATH, to add elements to Spark's classpath # - SPARK_JAVA_OPTS, to add JVM options diff --git a/core/lib/mesos-0.9.0.jar b/core/lib/mesos-0.9.0.jar Binary files differnew file mode 100644 index 0000000000..b7ad79bf2a --- /dev/null +++ b/core/lib/mesos-0.9.0.jar diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar Binary files differdeleted file mode 100644 index 941966c46a..0000000000 --- a/core/lib/mesos.jar +++ /dev/null diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 71a2ded7e7..de45137a4f 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -24,9 +24,13 @@ class Executor extends org.apache.mesos.Executor with Logging { initLogging() - override def init(d: ExecutorDriver, args: ExecutorArgs) { + override def registered( + driver: ExecutorDriver, + executorInfo: ExecutorInfo, + frameworkInfo: FrameworkInfo, + slaveInfo: SlaveInfo) { // Read spark.* system properties from executor arg - val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray) + val props = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) for ((key, value) <- props) { System.setProperty(key, value) } @@ -48,25 +52,29 @@ class Executor extends org.apache.mesos.Executor with Logging { threadPool = new ThreadPoolExecutor( 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) } + + override def disconnected(d: ExecutorDriver) {} + + override def reregistered(d: ExecutorDriver, s: SlaveInfo) {} - override def launchTask(d: ExecutorDriver, task: TaskDescription) { + override def launchTask(d: ExecutorDriver, task: TaskInfo) { threadPool.execute(new TaskRunner(task, d)) } - class TaskRunner(desc: TaskDescription, d: ExecutorDriver) + class TaskRunner(info: TaskInfo, d: ExecutorDriver) extends Runnable { override def run() = { - val tid = desc.getTaskId.getValue + val tid = info.getTaskId.getValue logInfo("Running task ID " + tid) d.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) + .setTaskId(info.getTaskId) .setState(TaskState.TASK_RUNNING) .build()) try { SparkEnv.set(env) Thread.currentThread.setContextClassLoader(classLoader) Accumulators.clear - val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader) + val task = Utils.deserialize[Task[Any]](info.getData.toByteArray, classLoader) for (gen <- task.generation) {// Update generation if any is set env.mapOutputTracker.updateGeneration(gen) } @@ -74,7 +82,7 @@ class Executor extends org.apache.mesos.Executor with Logging { val accumUpdates = Accumulators.values val result = new TaskResult(value, accumUpdates) d.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) + .setTaskId(info.getTaskId) .setState(TaskState.TASK_FINISHED) .setData(ByteString.copyFrom(Utils.serialize(result))) .build()) @@ -83,7 +91,7 @@ class Executor extends org.apache.mesos.Executor with Logging { case ffe: FetchFailedException => { val reason = ffe.toTaskEndReason d.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) + .setTaskId(info.getTaskId) .setState(TaskState.TASK_FAILED) .setData(ByteString.copyFrom(Utils.serialize(reason))) .build()) @@ -91,7 +99,7 @@ class Executor extends org.apache.mesos.Executor with Logging { case t: Throwable => { val reason = ExceptionFailure(t) d.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) + .setTaskId(info.getTaskId) .setState(TaskState.TASK_FAILED) .setData(ByteString.copyFrom(Utils.serialize(reason))) .build()) @@ -154,8 +162,8 @@ class Executor extends org.apache.mesos.Executor with Logging { Utils.copyStream(in, out, true) } - override def error(d: ExecutorDriver, code: Int, message: String) { - logError("Error from Mesos: %s (code %d)".format(message, code)) + override def error(d: ExecutorDriver, message: String) { + logError("Error from Mesos: " + message) } override def killTask(d: ExecutorDriver, t: TaskID) { @@ -172,7 +180,7 @@ class Executor extends org.apache.mesos.Executor with Logging { */ object Executor extends Logging { def main(args: Array[String]) { - System.loadLibrary("mesos") + MesosNativeLibrary.load() // Create a new Executor and start it running val exec = new Executor new MesosExecutorDriver(exec).run() diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala index 0d68470c03..b7b0361c62 100644 --- a/core/src/main/scala/spark/Job.scala +++ b/core/src/main/scala/spark/Job.scala @@ -8,9 +8,9 @@ import org.apache.mesos.Protos._ * callbacks. */ abstract class Job(val runId: Int, val jobId: Int) { - def slaveOffer(s: Offer, availableCpus: Double): Option[TaskDescription] + def slaveOffer(s: Offer, availableCpus: Double): Option[TaskInfo] def statusUpdate(t: TaskStatus): Unit - def error(code: Int, message: String): Unit + def error(message: String): Unit } diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index b95f40b877..391e5f1714 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -76,6 +76,9 @@ private class MesosScheduler( // URIs of JARs to pass to executor var jarUris: String = "" + // Create an ExecutorInfo for our tasks + val executorInfo = createExecutorInfo() + // Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first) private val jobOrdering = new Ordering[Job] { override def compare(j1: Job, j2: Job): Int = { @@ -105,7 +108,8 @@ private class MesosScheduler( setDaemon(true) override def run { val sched = MesosScheduler.this - driver = new MesosSchedulerDriver(sched, frameworkName, getExecutorInfo, master) + val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build() + driver = new MesosSchedulerDriver(sched, fwInfo, master) try { val ret = driver.run() logInfo("driver.run() returned with code " + ret) @@ -116,7 +120,7 @@ private class MesosScheduler( }.start } - def getExecutorInfo(): ExecutorInfo = { + def createExecutorInfo(): ExecutorInfo = { val sparkHome = sc.getSparkHome match { case Some(path) => path case None => @@ -124,25 +128,28 @@ private class MesosScheduler( "property, the SPARK_HOME environment variable or the SparkContext constructor") } val execScript = new File(sparkHome, "spark-executor").getCanonicalPath - val params = Params.newBuilder() + val environment = Environment.newBuilder() for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { if (System.getenv(key) != null) { - params.addParam(Param.newBuilder() - .setKey("env." + key) - .setValue(System.getenv(key)) - .build()) + environment.addVariables(Environment.Variable.newBuilder() + .setName(key) + .setValue(System.getenv(key)) + .build()) } } val memory = Resource.newBuilder() .setName("mem") - .setType(Resource.Type.SCALAR) - .setScalar(Resource.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) + .build() + val command = CommandInfo.newBuilder() + .setValue(execScript) + .setEnvironment(environment) .build() ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue("default").build()) - .setUri(execScript) + .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) - .setParams(params.build()) .addResources(memory) .build() } @@ -171,7 +178,7 @@ private class MesosScheduler( } } - override def registered(d: SchedulerDriver, frameworkId: FrameworkID) { + override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { logInfo("Registered as framework ID " + frameworkId.getValue) registeredLock.synchronized { isRegistered = true @@ -187,6 +194,10 @@ private class MesosScheduler( } } + override def disconnected(d: SchedulerDriver) {} + + override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + /** * Method called by Mesos to offer resources on slaves. We resond by asking our active jobs for * tasks in FIFO order. We fill each node with tasks in a round-robin manner so that tasks are @@ -194,7 +205,7 @@ private class MesosScheduler( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { synchronized { - val tasks = offers.map(o => new JArrayList[TaskDescription]) + val tasks = offers.map(o => new JArrayList[TaskInfo]) val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus")) val enoughMem = offers.map(o => { val mem = getResource(o.getResourcesList(), "mem") @@ -281,14 +292,14 @@ private class MesosScheduler( } } - override def error(d: SchedulerDriver, code: Int, message: String) { - logError("Mesos error: %s (error code: %d)".format(message, code)) + override def error(d: SchedulerDriver, message: String) { + logError("Mesos error: " + message) synchronized { if (activeJobs.size > 0) { // Have each job throw a SparkException with the error for ((jobId, activeJob) <- activeJobs) { try { - activeJob.error(code, message) + activeJob.error(message) } catch { case e: Exception => logError("Exception in error callback", e) } @@ -367,14 +378,18 @@ private class MesosScheduler( override def frameworkMessage( d: SchedulerDriver, - s: SlaveID, e: ExecutorID, + s: SlaveID, b: Array[Byte]) {} override def slaveLost(d: SchedulerDriver, s: SlaveID) { slavesWithExecutors.remove(s.getValue) } + override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { + slavesWithExecutors.remove(s.getValue) + } + override def offerRescinded(d: SchedulerDriver, o: OfferID) {} /** diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index 5e42ae6ecd..796498cfe4 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -141,7 +141,7 @@ class SimpleJob( } // Respond to an offer of a single slave from the scheduler by finding a task - def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskDescription] = { + def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskInfo] = { if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) @@ -167,15 +167,16 @@ class SimpleJob( // Create and return the Mesos task object val cpuRes = Resource.newBuilder() .setName("cpus") - .setType(Resource.Type.SCALAR) - .setScalar(Resource.Scalar.newBuilder().setValue(CPUS_PER_TASK).build()) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(CPUS_PER_TASK).build()) .build() val serializedTask = Utils.serialize(task) logDebug("Serialized size: " + serializedTask.size) val taskName = "task %d:%d".format(jobId, index) - return Some(TaskDescription.newBuilder() + return Some(TaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(offer.getSlaveId) + .setExecutor(sched.executorInfo) .setName(taskName) .addResources(cpuRes) .setData(ByteString.copyFrom(serializedTask)) @@ -290,9 +291,9 @@ class SimpleJob( } } - def error(code: Int, message: String) { + def error(message: String) { // Save the error message - abort("Mesos error: %s (error code: %d)".format(message, code)) + abort("Mesos error: " + message) } def abort(message: String) { diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bcc18e44fe..6e019d6e7f 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} +import org.apache.mesos.MesosNativeLibrary + import spark.broadcast._ class SparkContext( @@ -72,7 +74,7 @@ class SparkContext( case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => new LocalScheduler(threads.toInt, maxFailures.toInt) case _ => - System.loadLibrary("mesos") + MesosNativeLibrary.load() new MesosScheduler(this, master, frameworkName) } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 4a1767a057..08c5a990b4 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -4,6 +4,9 @@ import sbtassembly.Plugin._ import AssemblyKeys._ object SparkBuild extends Build { + // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or + // "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop. + val HADOOP_VERSION = "0.20.205.0" lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel) @@ -41,7 +44,8 @@ object SparkBuild extends Build { name := "spark-core", resolvers ++= Seq( "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", - "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/" + "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", + "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/" ), libraryDependencies ++= Seq( "com.google.guava" % "guava" % "11.0.1", @@ -49,9 +53,9 @@ object SparkBuild extends Build { "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "com.ning" % "compress-lzf" % "0.8.4", - "org.apache.hadoop" % "hadoop-core" % "0.20.2", + "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, "asm" % "asm-all" % "3.3.1", - "com.google.protobuf" % "protobuf-java" % "2.3.0", + "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.9", "org.jboss.netty" % "netty" % "3.2.6.Final", "it.unimi.dsi" % "fastutil" % "6.4.2" diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index b8442238b8..15ebf0c9b8 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -119,7 +119,7 @@ class ReplSuite extends FunSuite { assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output) } - if (System.getenv("MESOS_HOME") != null) { + if (System.getenv("MESOS_NATIVE_LIBRARY") != null) { test ("running on Mesos") { val output = runInterpreter("localquiet", """ var v = 7 @@ -13,22 +13,22 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then . $FWDIR/conf/spark-env.sh fi +# If the user specifies a Mesos JAR, put it before our included one on the classpath MESOS_CLASSPATH="" -MESOS_LIBRARY_PATH="" - -if [ "x$MESOS_HOME" != "x" ] ; then - MESOS_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar" - MESOS_LIBRARY_PATH="$MESOS_HOME/lib/java" +if [ "x$MESOS_JAR" != "x" ] ; then + MESOS_CLASSPATH="$MESOS_JAR" fi +# Figure out how much memory to use per executor and set it as an environment +# variable so that our process sees it and can report it to Mesos if [ "x$SPARK_MEM" == "x" ] ; then SPARK_MEM="512m" fi -export SPARK_MEM # So that the process sees it and can report it to Mesos +export SPARK_MEM # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="$SPARK_JAVA_OPTS" -JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH:$FWDIR/lib:$FWDIR/src/main/native:$MESOS_LIBRARY_PATH" +JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH" JAVA_OPTS+=" -Xms$SPARK_MEM -Xmx$SPARK_MEM" # Load extra JAVA_OPTS from conf/java-opts, if it exists if [ -e $FWDIR/conf/java-opts ] ; then @@ -36,35 +36,37 @@ if [ -e $FWDIR/conf/java-opts ] ; then fi export JAVA_OPTS -CORE_DIR=$FWDIR/core -REPL_DIR=$FWDIR/repl -EXAMPLES_DIR=$FWDIR/examples -BAGEL_DIR=$FWDIR/bagel +CORE_DIR="$FWDIR/core" +REPL_DIR="$FWDIR/repl" +EXAMPLES_DIR="$FWDIR/examples" +BAGEL_DIR="$FWDIR/bagel" # Build up classpath -CLASSPATH="$SPARK_CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes:$MESOS_CLASSPATH" -CLASSPATH+=:$FWDIR/conf -CLASSPATH+=:$REPL_DIR/target/scala-$SCALA_VERSION/classes -CLASSPATH+=:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes +CLASSPATH="$SPARK_CLASSPATH" +CLASSPATH+=":$MESOS_CLASSPATH" +CLASSPATH+=":$FWDIR/conf" +CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes" +CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes" +CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" for jar in `find $CORE_DIR/lib -name '*jar'`; do - CLASSPATH+=:$jar + CLASSPATH+=":$jar" done for jar in `find $FWDIR/lib_managed/jars -name '*jar'`; do - CLASSPATH+=:$jar + CLASSPATH+=":$jar" done for jar in `find $FWDIR/lib_managed/bundles -name '*jar'`; do - CLASSPATH+=:$jar + CLASSPATH+=":$jar" done for jar in `find $REPL_DIR/lib -name '*jar'`; do - CLASSPATH+=:$jar + CLASSPATH+=":$jar" done CLASSPATH+=:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes export CLASSPATH # Needed for spark-shell if [ -n "$SCALA_HOME" ]; then - SCALA=${SCALA_HOME}/bin/scala + SCALA="${SCALA_HOME}/bin/scala" else SCALA=scala fi -exec $SCALA -cp $CLASSPATH "$@" +exec "$SCALA" -cp "$CLASSPATH" "$@" |