blob: 941ba7a3f185a420e1fa54bcbc2050851f7d47a3 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
package spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import spark.Logging
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* An Schedulable entity that represent collection of Pools or TaskSetManagers
*/
private[spark] class Pool(
val poolName: String,
val schedulingMode: SchedulingMode,
initMinShare: Int,
initWeight: Int)
extends Schedulable
with Logging {
var schedulableQueue = new ArrayBuffer[Schedulable]
var schedulableNameToSchedulable = new HashMap[String, Schedulable]
var weight = initWeight
var minShare = initMinShare
var runningTasks = 0
var priority = 0
var stageId = 0
var name = poolName
var parent:Schedulable = null
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
}
}
override def addSchedulable(schedulable: Schedulable) {
schedulableQueue += schedulable
schedulableNameToSchedulable(schedulable.name) = schedulable
schedulable.parent= this
}
override def removeSchedulable(schedulable: Schedulable) {
schedulableQueue -= schedulable
schedulableNameToSchedulable -= schedulable.name
}
override def getSchedulableByName(schedulableName: String): Schedulable = {
if (schedulableNameToSchedulable.contains(schedulableName)) {
return schedulableNameToSchedulable(schedulableName)
}
for (schedulable <- schedulableQueue) {
var sched = schedulable.getSchedulableByName(schedulableName)
if (sched != null) {
return sched
}
}
return null
}
override def executorLost(executorId: String, host: String) {
schedulableQueue.foreach(_.executorLost(executorId, host))
}
override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue) {
shouldRevive |= schedulable.checkSpeculatableTasks()
}
return shouldRevive
}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
}
return sortedTaskSetQueue
}
override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
override def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
override def hasPendingTasks(): Boolean = {
schedulableQueue.exists(_.hasPendingTasks())
}
}
|