aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala32
2 files changed, 37 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d252fe8595..79c9051e88 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -30,6 +30,7 @@ import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState,
ExecutorInfo => MesosExecutorInfo, _}
+import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler._
@@ -123,14 +124,15 @@ private[spark] class MesosSchedulerBackend(
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = sc.conf.get("spark.executor.uri", null)
+ val executorBackendName = classOf[MesosExecutorBackend].getName
if (uri == null) {
- val executorPath = new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath
- command.setValue("%s %s".format(prefixEnv, executorPath))
+ val executorPath = new File(executorSparkHome, "/bin/spark-class").getCanonicalPath
+ command.setValue(s"$prefixEnv $executorPath $executorBackendName")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
- command.setValue("cd %s*; %s ./sbin/spark-executor".format(basename, prefixEnv))
+ command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val cpus = Resource.newBuilder()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index 78a30a40bf..073814c127 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.scheduler.mesos
+import org.apache.spark.executor.MesosExecutorBackend
import org.scalatest.FunSuite
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
@@ -37,6 +38,37 @@ import scala.collection.mutable.ArrayBuffer
class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
+ test("check spark-class location correctly") {
+ val conf = new SparkConf
+ conf.set("spark.mesos.executor.home" , "/mesos-home")
+
+ val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
+ listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+ EasyMock.replay(listenerBus)
+
+ val sc = EasyMock.createMock(classOf[SparkContext])
+ EasyMock.expect(sc.getSparkHome()).andReturn(Option("/spark-home")).anyTimes()
+ EasyMock.expect(sc.conf).andReturn(conf).anyTimes()
+ EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
+ EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
+ EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
+ EasyMock.replay(sc)
+ val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
+ EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
+ EasyMock.replay(taskScheduler)
+
+ val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+ // uri is null.
+ val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
+ assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+
+ // uri exists.
+ conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
+ val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
+ assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+ }
+
test("mesos resource offers result in launching tasks") {
def createOffer(id: Int, mem: Int, cpu: Int) = {
val builder = Offer.newBuilder()