1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
object Utils {
def createOffer(
offerId: String,
slaveId: String,
mem: Int,
cpus: Int,
ports: Option[(Long, Long)] = None,
gpus: Int = 0): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(mem))
builder.addResourcesBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpus))
ports.foreach { resourcePorts =>
builder.addResourcesBuilder()
.setName("ports")
.setType(Value.Type.RANGES)
.setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
}
if (gpus > 0) {
builder.addResourcesBuilder()
.setName("gpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(gpus))
}
builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(slaveId))
.setHostname(s"host${slaveId}")
.build()
}
def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = {
val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
verify(driver, times(1)).launchTasks(
Matchers.eq(Collections.singleton(createOfferId(offerId))),
captor.capture())
captor.getValue.asScala.toList
}
def createOfferId(offerId: String): OfferID = {
OfferID.newBuilder().setValue(offerId).build()
}
def createSlaveId(slaveId: String): SlaveID = {
SlaveID.newBuilder().setValue(slaveId).build()
}
def createExecutorId(executorId: String): ExecutorID = {
ExecutorID.newBuilder().setValue(executorId).build()
}
def createTaskId(taskId: String): TaskID = {
TaskID.newBuilder().setValue(taskId).build()
}
}
|