aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md12
-rwxr-xr-xconf/spark-env.sh.template2
-rw-r--r--core/lib/mesos-0.9.0.jarbin0 -> 264708 bytes
-rw-r--r--core/lib/mesos.jarbin147412 -> 0 bytes
-rw-r--r--core/src/main/scala/spark/Executor.scala34
-rw-r--r--core/src/main/scala/spark/Job.scala4
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala62
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala13
-rw-r--r--core/src/main/scala/spark/SparkContext.scala4
-rw-r--r--project/SparkBuild.scala10
-rw-r--r--repl/src/test/scala/spark/repl/ReplSuite.scala2
-rwxr-xr-xrun44
12 files changed, 115 insertions, 72 deletions
diff --git a/README.md b/README.md
index 5f9cd26df3..df9e73e4bd 100644
--- a/README.md
+++ b/README.md
@@ -37,6 +37,15 @@ to connect to. This can be a Mesos URL, or "local" to run locally with one
thread, or "local[N]" to run locally with N threads.
+## A Note About Hadoop
+
+Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
+storage systems. Because the HDFS API has changed in different versions of
+Hadoop, you must build Spark against the same version that your cluster runs.
+You can change the version by setting the `HADOOP_VERSION` variable at the top
+of `project/SparkBuild.scala`, then rebuilding Spark.
+
+
## Configuration
Spark can be configured through two files: `conf/java-opts` and
@@ -58,5 +67,8 @@ several Spark-specific variables you can set:
- `SPARK_JAVA_OPTS`: Extra options to pass to JVM.
+- `MESOS_NATIVE_LIBRARY`: Your Mesos library, if you want to run on a Mesos
+ cluster. For example, this might be `/usr/local/lib/libmesos.so` on Linux.
+
Note that `spark-env.sh` must be a shell script (it must be executable and start
with a `#!` header to specify the shell to use).
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index 6852b23a34..532a635a1b 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -2,7 +2,7 @@
# Set Spark environment variables for your site in this file. Some useful
# variables to set are:
-# - MESOS_HOME, to point to your Mesos installation
+# - MESOS_NATIVE_LIBRARY, to point to your Mesos native library (libmesos.so)
# - SCALA_HOME, to point to your Scala installation
# - SPARK_CLASSPATH, to add elements to Spark's classpath
# - SPARK_JAVA_OPTS, to add JVM options
diff --git a/core/lib/mesos-0.9.0.jar b/core/lib/mesos-0.9.0.jar
new file mode 100644
index 0000000000..b7ad79bf2a
--- /dev/null
+++ b/core/lib/mesos-0.9.0.jar
Binary files differ
diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar
deleted file mode 100644
index 941966c46a..0000000000
--- a/core/lib/mesos.jar
+++ /dev/null
Binary files differ
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index c1795e02a4..c795b6c351 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -24,9 +24,13 @@ class Executor extends org.apache.mesos.Executor with Logging {
initLogging()
- override def init(d: ExecutorDriver, args: ExecutorArgs) {
+ override def registered(
+ driver: ExecutorDriver,
+ executorInfo: ExecutorInfo,
+ frameworkInfo: FrameworkInfo,
+ slaveInfo: SlaveInfo) {
// Read spark.* system properties from executor arg
- val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray)
+ val props = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
for ((key, value) <- props) {
System.setProperty(key, value)
}
@@ -48,26 +52,30 @@ class Executor extends org.apache.mesos.Executor with Logging {
threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
}
+
+ override def disconnected(d: ExecutorDriver) {}
+
+ override def reregistered(d: ExecutorDriver, s: SlaveInfo) {}
- override def launchTask(d: ExecutorDriver, task: TaskDescription) {
+ override def launchTask(d: ExecutorDriver, task: TaskInfo) {
threadPool.execute(new TaskRunner(task, d))
}
- class TaskRunner(desc: TaskDescription, d: ExecutorDriver)
+ class TaskRunner(info: TaskInfo, d: ExecutorDriver)
extends Runnable {
override def run() = {
- val tid = desc.getTaskId.getValue
+ val tid = info.getTaskId.getValue
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(classLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo("Running task ID " + tid)
d.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
+ .setTaskId(info.getTaskId)
.setState(TaskState.TASK_RUNNING)
.build())
try {
Accumulators.clear
- val task = ser.deserialize[Task[Any]](desc.getData.toByteArray, classLoader)
+ val task = ser.deserialize[Task[Any]](info.getData.toByteArray, classLoader)
for (gen <- task.generation) {// Update generation if any is set
env.mapOutputTracker.updateGeneration(gen)
}
@@ -75,7 +83,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
d.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
+ .setTaskId(info.getTaskId)
.setState(TaskState.TASK_FINISHED)
.setData(ByteString.copyFrom(ser.serialize(result)))
.build())
@@ -84,7 +92,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
case ffe: FetchFailedException => {
val reason = ffe.toTaskEndReason
d.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
+ .setTaskId(info.getTaskId)
.setState(TaskState.TASK_FAILED)
.setData(ByteString.copyFrom(ser.serialize(reason)))
.build())
@@ -92,7 +100,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
case t: Throwable => {
val reason = ExceptionFailure(t)
d.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
+ .setTaskId(info.getTaskId)
.setState(TaskState.TASK_FAILED)
.setData(ByteString.copyFrom(ser.serialize(reason)))
.build())
@@ -155,8 +163,8 @@ class Executor extends org.apache.mesos.Executor with Logging {
Utils.copyStream(in, out, true)
}
- override def error(d: ExecutorDriver, code: Int, message: String) {
- logError("Error from Mesos: %s (code %d)".format(message, code))
+ override def error(d: ExecutorDriver, message: String) {
+ logError("Error from Mesos: " + message)
}
override def killTask(d: ExecutorDriver, t: TaskID) {
@@ -173,7 +181,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
*/
object Executor extends Logging {
def main(args: Array[String]) {
- System.loadLibrary("mesos")
+ MesosNativeLibrary.load()
// Create a new Executor and start it running
val exec = new Executor
new MesosExecutorDriver(exec).run()
diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala
index 0d68470c03..b7b0361c62 100644
--- a/core/src/main/scala/spark/Job.scala
+++ b/core/src/main/scala/spark/Job.scala
@@ -8,9 +8,9 @@ import org.apache.mesos.Protos._
* callbacks.
*/
abstract class Job(val runId: Int, val jobId: Int) {
- def slaveOffer(s: Offer, availableCpus: Double): Option[TaskDescription]
+ def slaveOffer(s: Offer, availableCpus: Double): Option[TaskInfo]
def statusUpdate(t: TaskStatus): Unit
- def error(code: Int, message: String): Unit
+ def error(message: String): Unit
}
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index 755e001106..a7711e0d35 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -76,6 +76,9 @@ private class MesosScheduler(
// URIs of JARs to pass to executor
var jarUris: String = ""
+ // Create an ExecutorInfo for our tasks
+ val executorInfo = createExecutorInfo()
+
// Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first)
private val jobOrdering = new Ordering[Job] {
override def compare(j1: Job, j2: Job): Int = j2.runId - j1.runId
@@ -94,16 +97,12 @@ private class MesosScheduler(
}
override def start() {
- if (sc.jars.size > 0) {
- // If the user added any JARS to the SparkContext, create an HTTP server
- // to serve them to our executors
- createJarServer()
- }
new Thread("Spark scheduler") {
setDaemon(true)
override def run {
val sched = MesosScheduler.this
- driver = new MesosSchedulerDriver(sched, frameworkName, getExecutorInfo, master)
+ val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
+ driver = new MesosSchedulerDriver(sched, fwInfo, master)
try {
val ret = driver.run()
logInfo("driver.run() returned with code " + ret)
@@ -114,33 +113,40 @@ private class MesosScheduler(
}.start
}
- def getExecutorInfo(): ExecutorInfo = {
+ def createExecutorInfo(): ExecutorInfo = {
val sparkHome = sc.getSparkHome match {
case Some(path) => path
case None =>
throw new SparkException("Spark home is not set; set it through the spark.home system " +
- "property, the SPARK_HOME environment variable or the SparkContext constructor")
+ "property, the SPARK_HOME environment variable or the SparkContext constructor")
+ }
+ // If the user added JARs to the SparkContext, create an HTTP server to ship them to executors
+ if (sc.jars.size > 0) {
+ createJarServer()
}
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
- val params = Params.newBuilder()
+ val environment = Environment.newBuilder()
for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
if (System.getenv(key) != null) {
- params.addParam(Param.newBuilder()
- .setKey("env." + key)
- .setValue(System.getenv(key))
- .build())
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(key)
+ .setValue(System.getenv(key))
+ .build())
}
}
val memory = Resource.newBuilder()
.setName("mem")
- .setType(Resource.Type.SCALAR)
- .setScalar(Resource.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build())
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build())
+ .build()
+ val command = CommandInfo.newBuilder()
+ .setValue(execScript)
+ .setEnvironment(environment)
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue("default").build())
- .setUri(execScript)
+ .setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
- .setParams(params.build())
.addResources(memory)
.build()
}
@@ -169,7 +175,7 @@ private class MesosScheduler(
}
}
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID) {
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
logInfo("Registered as framework ID " + frameworkId.getValue)
registeredLock.synchronized {
isRegistered = true
@@ -185,6 +191,10 @@ private class MesosScheduler(
}
}
+ override def disconnected(d: SchedulerDriver) {}
+
+ override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
/**
* Method called by Mesos to offer resources on slaves. We resond by asking our active jobs for
* tasks in FIFO order. We fill each node with tasks in a round-robin manner so that tasks are
@@ -192,7 +202,7 @@ private class MesosScheduler(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
- val tasks = offers.map(o => new JArrayList[TaskDescription])
+ val tasks = offers.map(o => new JArrayList[TaskInfo])
val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus"))
val enoughMem = offers.map(o => {
val mem = getResource(o.getResourcesList(), "mem")
@@ -279,14 +289,14 @@ private class MesosScheduler(
}
}
- override def error(d: SchedulerDriver, code: Int, message: String) {
- logError("Mesos error: %s (error code: %d)".format(message, code))
+ override def error(d: SchedulerDriver, message: String) {
+ logError("Mesos error: " + message)
synchronized {
if (activeJobs.size > 0) {
// Have each job throw a SparkException with the error
for ((jobId, activeJob) <- activeJobs) {
try {
- activeJob.error(code, message)
+ activeJob.error(message)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
@@ -365,14 +375,18 @@ private class MesosScheduler(
override def frameworkMessage(
d: SchedulerDriver,
- s: SlaveID,
e: ExecutorID,
+ s: SlaveID,
b: Array[Byte]) {}
override def slaveLost(d: SchedulerDriver, s: SlaveID) {
slavesWithExecutors.remove(s.getValue)
}
+ override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+ slavesWithExecutors.remove(s.getValue)
+ }
+
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
}
@@ -397,4 +411,4 @@ object MesosScheduler {
(lower.toLong / 1024 / 1024).toInt
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index 0f36699d26..01c7efff1e 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -144,7 +144,7 @@ class SimpleJob(
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskDescription] = {
+ def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskInfo] = {
if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -170,8 +170,8 @@ class SimpleJob(
// Create and return the Mesos task object
val cpuRes = Resource.newBuilder()
.setName("cpus")
- .setType(Resource.Type.SCALAR)
- .setScalar(Resource.Scalar.newBuilder().setValue(CPUS_PER_TASK).build())
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(CPUS_PER_TASK).build())
.build()
val startTime = System.currentTimeMillis
@@ -182,9 +182,10 @@ class SimpleJob(
.format(jobId, index, serializedTask.size, timeTaken, ser.getClass.getName))
val taskName = "task %d:%d".format(jobId, index)
- return Some(TaskDescription.newBuilder()
+ return Some(TaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId)
+ .setExecutor(sched.executorInfo)
.setName(taskName)
.addResources(cpuRes)
.setData(ByteString.copyFrom(serializedTask))
@@ -301,9 +302,9 @@ class SimpleJob(
}
}
- def error(code: Int, message: String) {
+ def error(message: String) {
// Save the error message
- abort("Mesos error: %s (error code: %d)".format(message, code))
+ abort("Mesos error: " + message)
}
def abort(message: String) {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index bcc18e44fe..6e019d6e7f 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
+import org.apache.mesos.MesosNativeLibrary
+
import spark.broadcast._
class SparkContext(
@@ -72,7 +74,7 @@ class SparkContext(
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
new LocalScheduler(threads.toInt, maxFailures.toInt)
case _ =>
- System.loadLibrary("mesos")
+ MesosNativeLibrary.load()
new MesosScheduler(this, master, frameworkName)
}
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 4a1767a057..08c5a990b4 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -4,6 +4,9 @@ import sbtassembly.Plugin._
import AssemblyKeys._
object SparkBuild extends Build {
+ // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
+ // "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop.
+ val HADOOP_VERSION = "0.20.205.0"
lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel)
@@ -41,7 +44,8 @@ object SparkBuild extends Build {
name := "spark-core",
resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
- "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/"
+ "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
+ "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/"
),
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "11.0.1",
@@ -49,9 +53,9 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"com.ning" % "compress-lzf" % "0.8.4",
- "org.apache.hadoop" % "hadoop-core" % "0.20.2",
+ "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
"asm" % "asm-all" % "3.3.1",
- "com.google.protobuf" % "protobuf-java" % "2.3.0",
+ "com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.9",
"org.jboss.netty" % "netty" % "3.2.6.Final",
"it.unimi.dsi" % "fastutil" % "6.4.2"
diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala
index b8442238b8..15ebf0c9b8 100644
--- a/repl/src/test/scala/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuite.scala
@@ -119,7 +119,7 @@ class ReplSuite extends FunSuite {
assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
}
- if (System.getenv("MESOS_HOME") != null) {
+ if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
test ("running on Mesos") {
val output = runInterpreter("localquiet", """
var v = 7
diff --git a/run b/run
index c7fc65b5f6..2bc025ec0b 100755
--- a/run
+++ b/run
@@ -13,22 +13,22 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi
+# If the user specifies a Mesos JAR, put it before our included one on the classpath
MESOS_CLASSPATH=""
-MESOS_LIBRARY_PATH=""
-
-if [ "x$MESOS_HOME" != "x" ] ; then
- MESOS_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar"
- MESOS_LIBRARY_PATH="$MESOS_HOME/lib/java"
+if [ "x$MESOS_JAR" != "x" ] ; then
+ MESOS_CLASSPATH="$MESOS_JAR"
fi
+# Figure out how much memory to use per executor and set it as an environment
+# variable so that our process sees it and can report it to Mesos
if [ "x$SPARK_MEM" == "x" ] ; then
SPARK_MEM="512m"
fi
-export SPARK_MEM # So that the process sees it and can report it to Mesos
+export SPARK_MEM
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS"
-JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH:$FWDIR/lib:$FWDIR/src/main/native:$MESOS_LIBRARY_PATH"
+JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS+=" -Xms$SPARK_MEM -Xmx$SPARK_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e $FWDIR/conf/java-opts ] ; then
@@ -36,35 +36,37 @@ if [ -e $FWDIR/conf/java-opts ] ; then
fi
export JAVA_OPTS
-CORE_DIR=$FWDIR/core
-REPL_DIR=$FWDIR/repl
-EXAMPLES_DIR=$FWDIR/examples
-BAGEL_DIR=$FWDIR/bagel
+CORE_DIR="$FWDIR/core"
+REPL_DIR="$FWDIR/repl"
+EXAMPLES_DIR="$FWDIR/examples"
+BAGEL_DIR="$FWDIR/bagel"
# Build up classpath
-CLASSPATH="$SPARK_CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes:$MESOS_CLASSPATH"
-CLASSPATH+=:$FWDIR/conf
-CLASSPATH+=:$REPL_DIR/target/scala-$SCALA_VERSION/classes
-CLASSPATH+=:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes
+CLASSPATH="$SPARK_CLASSPATH"
+CLASSPATH+=":$MESOS_CLASSPATH"
+CLASSPATH+=":$FWDIR/conf"
+CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes"
+CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
+CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $CORE_DIR/lib -name '*jar'`; do
- CLASSPATH+=:$jar
+ CLASSPATH+=":$jar"
done
for jar in `find $FWDIR/lib_managed/jars -name '*jar'`; do
- CLASSPATH+=:$jar
+ CLASSPATH+=":$jar"
done
for jar in `find $FWDIR/lib_managed/bundles -name '*jar'`; do
- CLASSPATH+=:$jar
+ CLASSPATH+=":$jar"
done
for jar in `find $REPL_DIR/lib -name '*jar'`; do
- CLASSPATH+=:$jar
+ CLASSPATH+=":$jar"
done
CLASSPATH+=:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes
export CLASSPATH # Needed for spark-shell
if [ -n "$SCALA_HOME" ]; then
- SCALA=${SCALA_HOME}/bin/scala
+ SCALA="${SCALA_HOME}/bin/scala"
else
SCALA=scala
fi
-exec $SCALA -cp $CLASSPATH "$@"
+exec "$SCALA" -cp "$CLASSPATH" "$@"