aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
blob: 18cc15c2a5ee9e42fa47617c3e79344a7a7f6348 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package spark.scheduler.cluster

import java.io.{File, FileInputStream, FileOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.util.control.Breaks._
import scala.xml._

import spark.Logging
import spark.scheduler.cluster.SchedulingMode.SchedulingMode

import java.util.Properties

/**
 * An interface to build Schedulable tree
 * buildPools: build the tree nodes(pools)
 * addTaskSetManager: build the leaf nodes(TaskSetManagers)
 */
private[spark] trait SchedulableBuilder {
  def buildPools()
  def addTaskSetManager(manager: Schedulable, properties: Properties)
}

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {

  override def buildPools() {
    //nothing
  }

  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    rootPool.addSchedulable(manager)
  }
}

private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {

  val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
  val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
  val DEFAULT_POOL_NAME = "default"
  val MINIMUM_SHARES_PROPERTY = "minShare"
  val SCHEDULING_MODE_PROPERTY = "schedulingMode"
  val WEIGHT_PROPERTY = "weight"
  val POOL_NAME_PROPERTY = "@name"
  val POOLS_PROPERTY = "pool"
  val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
  val DEFAULT_MINIMUM_SHARE = 2
  val DEFAULT_WEIGHT = 1

  override def buildPools() {
    val file = new File(schedulerAllocFile)
    if (file.exists()) {
      val xml = XML.loadFile(file)
      for (poolNode <- (xml \\ POOLS_PROPERTY)) {

        val poolName = (poolNode \ POOL_NAME_PROPERTY).text
        var schedulingMode = DEFAULT_SCHEDULING_MODE
        var minShare = DEFAULT_MINIMUM_SHARE
        var weight = DEFAULT_WEIGHT

        val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
        if (xmlSchedulingMode != "") {
          try {
            schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
          } catch {
            case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
          }
        }

        val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
        if (xmlMinShare != "") {
          minShare = xmlMinShare.toInt
        }

        val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
        if (xmlWeight != "") {
          weight = xmlWeight.toInt
        }

        val pool = new Pool(poolName, schedulingMode, minShare, weight)
        rootPool.addSchedulable(pool)
        logInfo("Create new pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
          poolName, schedulingMode, minShare, weight))
      }
    }

    //finally create "default" pool
    if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
      val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
      rootPool.addSchedulable(pool)
      logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
        DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
    }
  }

  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    var poolName = DEFAULT_POOL_NAME
    var parentPool = rootPool.getSchedulableByName(poolName)
    if (properties != null) {
      poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
      parentPool = rootPool.getSchedulableByName(poolName)
      if (parentPool == null) {
        //we will create a new pool that user has configured in app instead of being defined in xml file
        parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
        rootPool.addSchedulable(parentPool)
        logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
          poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
      }
    }
    parentPool.addSchedulable(manager)
    logInfo("Added task set " + manager.name + " tasks to pool "+poolName)
  }
}