diff options
author | Jongyoul Lee <jongyoul@gmail.com> | 2015-01-19 02:01:56 -0800 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-01-19 02:01:56 -0800 |
commit | 4a4f9ccba2b42b64356db7f94ed9019212fc7317 (patch) | |
tree | 2e5d3fdec12850b5cc845018d1f416c560aae325 /core | |
parent | 3453d578ad9933be6881488c8ca3611e5b686af9 (diff) | |
download | spark-4a4f9ccba2b42b64356db7f94ed9019212fc7317.tar.gz spark-4a4f9ccba2b42b64356db7f94ed9019212fc7317.tar.bz2 spark-4a4f9ccba2b42b64356db7f94ed9019212fc7317.zip |
[SPARK-5088] Use spark-class for running executors directly
Author: Jongyoul Lee <jongyoul@gmail.com>
Closes #3897 from jongyoul/SPARK-5088 and squashes the following commits:
8232aa8 [Jongyoul Lee] [SPARK-5088] Use spark-class for running executors directly - Added a listenerBus for fixing test cases
932289f [Jongyoul Lee] [SPARK-5088] Use spark-class for running executors directly - Rebased from master
613cb47 [Jongyoul Lee] [SPARK-5088] Use spark-class for running executors directly - Fixed code if spark.executor.uri doesn't have any value - Added test cases
ff57bda [Jongyoul Lee] [SPARK-5088] Use spark-class for running executors directly - Adjusted orders of import
97e4bd4 [Jongyoul Lee] [SPARK-5088] Use spark-class for running executors directly - Changed command for using spark-class directly - Delete sbin/spark-executor and moved some codes into spark-class' case statement
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala | 32 |
2 files changed, 37 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index d252fe8595..79c9051e88 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -30,6 +30,7 @@ import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, ExecutorInfo => MesosExecutorInfo, _} +import org.apache.spark.executor.MesosExecutorBackend import org.apache.spark.{Logging, SparkContext, SparkException, TaskState} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler._ @@ -123,14 +124,15 @@ private[spark] class MesosSchedulerBackend( val command = CommandInfo.newBuilder() .setEnvironment(environment) val uri = sc.conf.get("spark.executor.uri", null) + val executorBackendName = classOf[MesosExecutorBackend].getName if (uri == null) { - val executorPath = new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath - command.setValue("%s %s".format(prefixEnv, executorPath)) + val executorPath = new File(executorSparkHome, "/bin/spark-class").getCanonicalPath + command.setValue(s"$prefixEnv $executorPath $executorBackendName") } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.split('/').last.split('.').head - command.setValue("cd %s*; %s ./sbin/spark-executor".format(basename, prefixEnv)) + command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName") command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } val cpus = Resource.newBuilder() diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 78a30a40bf..073814c127 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.mesos +import org.apache.spark.executor.MesosExecutorBackend import org.scalatest.FunSuite import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus, @@ -37,6 +38,37 @@ import scala.collection.mutable.ArrayBuffer class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { + test("check spark-class location correctly") { + val conf = new SparkConf + conf.set("spark.mesos.executor.home" , "/mesos-home") + + val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) + listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2))) + EasyMock.replay(listenerBus) + + val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.getSparkHome()).andReturn(Option("/spark-home")).anyTimes() + EasyMock.expect(sc.conf).andReturn(conf).anyTimes() + EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() + EasyMock.expect(sc.listenerBus).andReturn(listenerBus) + EasyMock.replay(sc) + val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() + EasyMock.replay(taskScheduler) + + val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + // uri is null. + val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id") + assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") + + // uri exists. + conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") + val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id") + assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") + } + test("mesos resource offers result in launching tasks") { def createOffer(id: Int, mem: Int, cpu: Int) = { val builder = Offer.newBuilder() |