aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
blob: 10b5a7f57a802537b717742deda4a9655e345132 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.streaming.scheduler

import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.streaming.receiver.Receiver

/**
 * A class that tries to schedule receivers with evenly distributed. There are two phases for
 * scheduling receivers.
 *
 * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
 *   all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
 *   It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
 *   receiverTrackingInfoMap according to the results of `scheduleReceivers`.
 *   `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
 *   contains the scheduled locations. Then when a receiver is starting, it will send a register
 *   request and `ReceiverTracker.registerReceiver` will be called. In
 *   `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
 *   if the location of this receiver is one of the scheduled executors, if not, the register will
 *   be rejected.
 * - The second phase is local scheduling when a receiver is restarting. There are two cases of
 *   receiver restarting:
 *   - If a receiver is restarting because it's rejected due to the real location and the scheduled
 *     executors mismatching, in other words, it fails to start in one of the locations that
 *     `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
 *     still alive in the list of scheduled executors, then use them to launch the receiver job.
 *   - If a receiver is restarting without a scheduled executors list, or the executors in the list
 *     are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
 *     not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
 *     it. Then when this receiver is registering, we can know this is a local scheduling, and
 *     `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
 *     location is matching.
 *
 * In conclusion, we should make a global schedule, try to achieve that exactly as long as possible,
 * otherwise do local scheduling.
 */
private[streaming] class ReceiverSchedulingPolicy {

  /**
   * Try our best to schedule receivers with evenly distributed. However, if the
   * `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly
   * because we have to respect them.
   *
   * Here is the approach to schedule executors:
   * <ol>
   *   <li>First, schedule all the receivers with preferred locations (hosts), evenly among the
   *       executors running on those host.</li>
   *   <li>Then, schedule all other receivers evenly among all the executors such that overall
   *       distribution over all the receivers is even.</li>
   * </ol>
   *
   * This method is called when we start to launch receivers at the first time.
   */
  def scheduleReceivers(
      receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = {
    if (receivers.isEmpty) {
      return Map.empty
    }

    if (executors.isEmpty) {
      return receivers.map(_.streamId -> Seq.empty).toMap
    }

    val hostToExecutors = executors.groupBy(_.split(":")(0))
    val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
    val numReceiversOnExecutor = mutable.HashMap[String, Int]()
    // Set the initial value to 0
    executors.foreach(e => numReceiversOnExecutor(e) = 0)

    // Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
    // we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
    for (i <- 0 until receivers.length) {
      // Note: preferredLocation is host but executors are host:port
      receivers(i).preferredLocation.foreach { host =>
        hostToExecutors.get(host) match {
          case Some(executorsOnHost) =>
            // preferredLocation is a known host. Select an executor that has the least receivers in
            // this host
            val leastScheduledExecutor =
              executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
            scheduledExecutors(i) += leastScheduledExecutor
            numReceiversOnExecutor(leastScheduledExecutor) =
              numReceiversOnExecutor(leastScheduledExecutor) + 1
          case None =>
            // preferredLocation is an unknown host.
            // Note: There are two cases:
            // 1. This executor is not up. But it may be up later.
            // 2. This executor is dead, or it's not a host in the cluster.
            // Currently, simply add host to the scheduled executors.
            scheduledExecutors(i) += host
        }
      }
    }

    // For those receivers that don't have preferredLocation, make sure we assign at least one
    // executor to them.
    for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) {
      // Select the executor that has the least receivers
      val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
      scheduledExecutorsForOneReceiver += leastScheduledExecutor
      numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
    }

    // Assign idle executors to receivers that have less executors
    val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
    for (executor <- idleExecutors) {
      // Assign an idle executor to the receiver that has least candidate executors.
      val leastScheduledExecutors = scheduledExecutors.minBy(_.size)
      leastScheduledExecutors += executor
    }

    receivers.map(_.streamId).zip(scheduledExecutors).toMap
  }

  /**
   * Return a list of candidate executors to run the receiver. If the list is empty, the caller can
   * run this receiver in arbitrary executor.
   *
   * This method tries to balance executors' load. Here is the approach to schedule executors
   * for a receiver.
   * <ol>
   *   <li>
   *     If preferredLocation is set, preferredLocation should be one of the candidate executors.
   *   </li>
   *   <li>
   *     Every executor will be assigned to a weight according to the receivers running or
   *     scheduling on it.
   *     <ul>
   *       <li>
   *         If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
   *       </li>
   *       <li>
   *         If a receiver is scheduled to an executor but has not yet run, it contributes
   *         `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
   *     </ul>
   *     At last, if there are any idle executors (weight = 0), returns all idle executors.
   *     Otherwise, returns the executors that have the minimum weight.
   *   </li>
   * </ol>
   *
   * This method is called when a receiver is registering with ReceiverTracker or is restarting.
   */
  def rescheduleReceiver(
      receiverId: Int,
      preferredLocation: Option[String],
      receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
      executors: Seq[String]): Seq[String] = {
    if (executors.isEmpty) {
      return Seq.empty
    }

    // Always try to schedule to the preferred locations
    val scheduledExecutors = mutable.Set[String]()
    scheduledExecutors ++= preferredLocation

    val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
      receiverTrackingInfo.state match {
        case ReceiverState.INACTIVE => Nil
        case ReceiverState.SCHEDULED =>
          val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
          // The probability that a scheduled receiver will run in an executor is
          // 1.0 / scheduledLocations.size
          scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
        case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
      }
    }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor

    val idleExecutors = executors.toSet -- executorWeights.keys
    if (idleExecutors.nonEmpty) {
      scheduledExecutors ++= idleExecutors
    } else {
      // There is no idle executor. So select all executors that have the minimum weight.
      val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
      if (sortedExecutors.nonEmpty) {
        val minWeight = sortedExecutors(0)._2
        scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
      } else {
        // This should not happen since "executors" is not empty
      }
    }
    scheduledExecutors.toSeq
  }
}